Skip to main content

Part-4 Asynchronous Data Communication Between Microservices Using RabbitMQ Message Broker With MassTransit[.NET6 Microservice Series]

In this article, we are going to establish asynchronous data communication between microservice using the RabbitMQ message broker along with MassTransit.

Now let's take look at the 'Orders' GET endpoint response.
Here we can understand we don't have 'Product' information. So we have to make a way to save the required 'Product' information into the 'SalesBusiness.API' microservice application from the 'Manufacture.API' microservice application.

Steps we are going to accomplish this demo are:
  • Configure RabbitMQ message broker with MassTransit for asynchronous data communication between the microservices.
  • In the 'Manufacture' database we have a 'Products' table(like Master table), now we will create a new 'Products' table(like child table) in 'SalesDatabase', but here we will add only required columns. So that while fetching the 'Orders' endpoint it is to fetch required products information.
  • So with help of RabbitMQ, we always maintain consistent or exact data between both parent and child tables.

Part-3 Create A Microservice For The Orders Endpoint[.NET6 Microservice Series]

RabbitMQ:

RabbitMQ is a message broker that helps to share data asynchronously between the publishers and receivers.
RabbitMQ exchange flow:
  • RabbitMQ supports different exchange types like 'Direct', 'Topic', 'Headers', 'Fanout'. By default RabitMQ uses 'Fanout' exchange, we also use this fanout exchange for our demo.
  • In Fanout exchange when a producer sends a message to RabbitMQ then the message will be sent to the 'FanoutExchange' and then the from the fanout exchange messages will be delivered to all queues then from the queued messages will be received by the respective receivers.

MassTransit:

MassTransit is free open-source software that can wrap around message brokers like 'RabbitMQ', 'Azure Service Bus', 'SQS', 'ActiveMQ Service Bus' etc.

MassTransit provides an easy way to configure the message brokers in our .NET applications.

RabbitMQ Docker-Compose:

step1:
Install Docker Desktop into your local machine.(https://www.docker.com/products/docker-desktop/)

step2:
Now create a folder of any name then in that folder add the 'docker-compose.yaml' file. Then add the following docker-compose configurations.
docker-compose.yaml:
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - 15672:15672
      - 4001:5672
    container_name: myrabbitcontainer
  • (Line: 1) The 'version' is the docker-compose document version.
  • (Line: 3) The 'services' represents software or service we are going to run on docker.
  • (Line: 4) The 'rabbitmq' custom name of our service, is recommended giving a meaningful name that represents the service.
  • (Line: 5) Docker image of 'RabbitMQ'.
  • (Line: 7) The right-hand side port '15672' runs the RabbitMQ admin tool.
  • (Line: 8) The right-hand side port '5672' runs the RabbitMQ service.
  • (Line: 9) Name of the container to run our 'RabbitMQ' image.
Step3:
Command to run the docker container.
docker-compose up -d

Create A Shared Library Contains DTOs For RabbitMQ Messages:

Our two microservices in which one will act as publisher and another will act as a consumer using the RabbitMQ message broker. So the 'Type' of the message will be a model(POCO class) that will be created in 'Shared Library' that will be consumed by both our Microservices applications.

Now create a .NET Library.
CLI command
dotnet new classlib -o Your_Project_Name

Note: Now add the newly created class library reference into both microservice applications.

Now in the shared library let's add a new folder like 'RabbitMqModels', next add a model(POCO class) like 'ProductCreated'(this type of our message shared by RabbitMq).
{Shared_Project}/RabbitMqModels/ProductCreated.cs:
namespace Shared.Models.RabbitMqModel;

public class ProductCreated
{
    public int Id { get; set; }
    public string Name { get; set; }
}

Install NuGet Packges:

To configure RabbitMQ and MassTransit we have to install the following NuGet Package into our both microservice applications.

Install 'MassTransit' NuGet Package.
CLI command
dotnet add package MassTransit --version 8.0.0

Package Manager
Install-Package MassTransit -Version 8.0.0

Install 'MassTransit.AspNetCore' NuGet Package.
CLI command
dotnet add package MassTransit.AspNetCore --version 7.3.1

Package Manager
Install-Package MassTransit.AspNetCore -Version 7.3.1

Install 'MassTransit.RabbitMQ'
CLI command
dotnet add package MassTransit.RabbitMQ --version 8.0.0

Package Manager
Install-Package MassTransit.RabbitMQ -Version 8.0.0

Publisher:

Our 'Manufacturer.API' microservice application will be the publisher.

Configure 'MassTransit' and 'RabbitMQ' services in to the 'Program.cs'.
Manufacturer.API/Program.cs:
builder.Services.AddMassTransit(options => {
    options.UsingRabbitMq((context,cfg) => {
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {
            h.Username("guest");
            h.Password("guest");
        });
        
    });
});

  • (Line: 1) Registered the 'MassTransit' service.
  • (Line: 2) The 'RabbitMQ' service is configured inside of the 'MassTransit' service.
  • (Line: 3) Defined our 'RabbitMQ' host. Here port '4001' is my custom port exposed from the docker container.
  • (Line: 4&5) The default username and password for 'RabbitMQ' are 'guest'.
Inside of the 'ProductController' constructor let's inject the 'MassTransit.IPublishEndpoint'
Manufactures.API/Controllers/ProductController.cs:
using MassTransit;

// existing code hidden for display purpose

[ApiController]
[Route("[controller]")]
public class ProductsController : ControllerBase
{
    private readonly IPublishEndpoint _publishEndpoint;
    public ProductsController(
    IPublishEndpoint publishEndpoint)
    {
        
        _publishEndpoint = publishEndpoint;
    }
}
Now let's implement publish logic in our 'PostAsync' action method.
Manufactures.API/Controllers/ProductController.cs:
using Shared.Models.RabbitMqModel;

// existing code hidden for display purpose

[HttpPost]
public async Task<IActionResult> PostAsync(Products newProduct)
{
	_manufactureContext.Products.Add(newProduct);
	await _manufactureContext.SaveChangesAsync();
	await _publishEndpoint.Publish<ProductCreated>(new ProductCreated
	{
		Id = newProduct.Id,
		Name = newProduct.Name
	});

	return CreatedAtAction("Get", new { id = newProduct.Id }, newProduct);
}
  • (Line: 10-14) The 'IPublishEndpoint.Publish<T>()' method pushes the message into the 'Fanout Exchange' of RabbitMQ. The message of type is 'Shared.Models.RabbitMqMode.ProductCreated'.

Create A Child Products Table In SalesBusiness Database:

Let's create a child 'Products' table in the 'SalesBusiness' database.
CREATE TABLE [dbo].[Products](
	[Id] [int] NOT NULL,
	[Name] [varchar](500) NULL,
 CONSTRAINT [PK_Products] PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

Configure Products Table Entity In SalesBusiness.API Project:

Let's add the 'Product' entity in our 'SalesBusiness.API' project
SalesBusiness.API/Data/Entities/Products.cs:
namespace SalesBusiness.Api.Data.Entities;

public class Products
{
    public int Id { get; set; }
    public string Name { get; set; }
}
Now register the 'Products' inside of the 'SalesBusinessContext'.
SalesBusiness.API/Data/SalesBusinesContext.cs:
using Microsoft.EntityFrameworkCore;
using SalesBusiness.Api.Data.Entities;

namespace SalesBusiness.Api.Data;
public class SalesBusinessContext: DbContext
{
    public SalesBusinessContext(DbContextOptions<SalesBusinessContext> options):base(options)
    {
        
    }
    public DbSet<Orders> Orders{get;set;}
    public DbSet<Products> Products{get;set;}
}

Consumer:

Our 'SalesBusiness.API' microservice application will be the consumer.

Using 'MassTransit.IConsumer<T>' we will make a RabbitMQ queue consumer. So let's create a folder like 'Consumer' and then add a class inside of it like 'ProductCreatedConsumer.cs'.
SalesBusiness.API/Consumer/ProductCreatedConsumer.cs:
using MassTransit;
using SalesBusiness.Api.Data;
using SalesBusiness.Api.Data.Entities;
using Shared.Models.RabbitMqModel;

namespace SalesBusiness.Api.Consumer;

public class ProductCreatedConsumer : IConsumer<ProductCreated>
{
    private readonly SalesBusinessContext _salesBusinessContext;
    public ProductCreatedConsumer(SalesBusinessContext salesBusinessContext)
    {
        _salesBusinessContext = salesBusinessContext;
    }
    public async Task Consume(ConsumeContext<ProductCreated> context)
    {
        var newProduct = new Products{
            Id = context.Message.Id,
            Name = context.Message.Name
        };
        _salesBusinessContext.Add(newProduct);
        await _salesBusinessContext.SaveChangesAsync();
    }
}
  • To make 'ProductCreatedConsumer' entity as a RabbitMQ queue consumer it must inherit the 'MassTransit.IConsumer'.
  • (Line: 15-23) Here implemented 'Consume' asynchronous method that's gets executed on every new message received by the queue. Inside of this method, our logic to store the message data has been implemented.
In 'Program.cs' let's register 'MassTransit' and 'RabbitMQ' services.
SalesBusiness.API/Programc.s:
builder.Services.AddMassTransit(x => {
    x.AddConsumer<ProductCreatedConsumer>();
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {
            h.Username("guest");
            h.Password("guest");
        });
        cfg.ReceiveEndpoint("event-listener", e =>
        {
            e.ConfigureConsumer<ProductCreatedConsumer>(context);
        });
    });
});


  • (Line: 9-12) Here defined our channel or queue name like 'event-listener'. So 'RabbitMQ' Fanout exchange pushes messages to all queues. Here register our channel entity that is 'ProductCreatedConsumer' which listens for every new message from the RabbitMQ queue.

Test Microservices Communication With RabbitMQ Message Broker:

Step1:
Make sure our microservice application running under different port numbers. To change port number
goto 'Properties/launchsettings.json'.

Step2:
Make sure to run the Docker desktop application on our local machine and then run our docker-compose for 'RabbitMQ' as explained above.

Step3:
Let's try to create a new product from our Product Endpoint(Manufacture.API microservice application). In the action method we are publishing our new product record as a message to the RabbitMQ queue channel, so our 'SalesBusiness.API' project act as a consumer and reads the message and saves the message into our child 'Products' table in the 'SalesBusiness' database. This step represents asynchronous communication between our microservices.

Now we can observe data between '[Manufacture].[dbo].[Products]' and '[SalesBusiness].[dbo].[Products]' these 2 tables are synced because of RabbitMQ asynchronous communication between the microservices.

Update Orders GET Endpoint To Fetch Products Information:

First, let's create DTO for our 'Orders' like 'Orders.Dto' that contains 'Prodcut.Dto' as one of its properties.
SalesBusiness.API/DTOs/Orders.Dto.cs:
namespace SalesBusiness.Api.DTOs;

public class OrdersDto
{
    public int Id { get; set; }
    public string UserId { get; set; }
    public DateTime? OrderDate { get; set; }
    public ProductDto ProductInfo { get; set; }
}

public class ProductDto
{
    public int Id { get; set; }
    public string Name { get; set; }
}
Now let's update our Orders GET endpoint to fetch the 'Product' information along with the 'Orders'.
SalesBusiness.API/Controllers/OrdersController.cs:
[HttpGet]
public async Task<IActionResult> GetAsync()
{
	var orders = await _salesBusinessContext.Orders
	.Join(
		_salesBusinessContext.Products,
		order => order.ProductId,
		product => product.Id,
		(order, product) => new { Order = order, Product = product }
	)
	.Select(_ => new OrdersDto
	{
		Id = _.Order.Id,
		OrderDate = _.Order.OrderDate,
		UserId = _.Order.UserId,
		ProductInfo = new ProductDto
		{
			Id = _.Product.Id,
			Name = _.Product.Name
		}
	}).ToListAsync();
	return Ok(orders);
}
  • Here 'Orders' table is joined with the 'Product' table while fetching the collection of 'Orders'.
In the next article, we are going to implement the 'API Gateway'.

Support Me!
Buy Me A Coffee PayPal Me

Video Session:

Wrapping Up:

Hopefully, I think this article delivered some useful information Microservice flow in .NET6 applications. using I love to have your feedback, suggestions, and better techniques in the comment section below.

Refer:


Comments

Post a Comment

Popular posts from this blog

.NET6 Web API CRUD Operation With Entity Framework Core

In this article, we are going to do a small demo on AspNetCore 6 Web API CRUD operations. What Is Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains a programming function that can be requested via HTTP calls to save or fetch the data for their respective clients. Some of the key characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build REST full services. Create A .NET6 Web API Application: Let's create a .Net6 Web API sample application to accomplish our

Usage Of CancellationToken In Asp.Net Core Applications

When To Use CancellationToken?: In a web application request abortion or orphan, requests are quite common. On users disconnected by network interruption or navigating between multiple pages before proper response or closing of the browser, tabs make the request aborted or orphan. An orphan request can't deliver a response to the client, but it will execute all steps(like database calls, HTTP calls, etc) at the server. Complete execution of an orphan request at the server might not be a problem generally if at all requests need to work on time taking a job at the server in those cases might be nice to terminate the execution immediately. So CancellationToken can be used to terminate a request execution at the server immediately once the request is aborted or orphan. Here we are going to see some sample code snippets about implementing a CancellationToken for Entity FrameworkCore, Dapper ORM, and HttpClient calls in Asp.NetCore MVC application. Note: The sample codes I will show in

A Small Guide On NestJS Queues

NestJS Application Queues helps to deal with application scaling and performance challenges. When To Use Queues?: API request that mostly involves in time taking operations like CPU bound operation, doing them synchronously which will result in thread blocking. So to avoid these issues, it is an appropriate way to make the CPU-bound operation separate background job.  In nestjs one of the best solutions for these kinds of tasks is to implement the Queues. For queueing mechanism in the nestjs application most recommended library is '@nestjs/bull'(Bull is nodejs queue library). The 'Bull' depends on Redis cache for data storage like a job. So in this queueing technique, we will create services like 'Producer' and 'Consumer'. The 'Producer' is used to push our jobs into the Redis stores. The consumer will read those jobs(eg: CPU Bound Operations) and process them. So by using this queues technique user requests processed very fastly because actually

Blazor WebAssembly Custom Authentication From Scratch

In this article, we are going to explore and implement custom authentication from the scratch. In this sample, we will use JWT authentication for user authentication. Main Building Blocks Of Blazor WebAssembly Authentication: The core concepts of blazor webassembly authentication are: AuthenticationStateProvider Service AuthorizeView Component Task<AuthenticationState> Cascading Property CascadingAuthenticationState Component AuthorizeRouteView Component AuthenticationStateProvider Service - this provider holds the authentication information about the login user. The 'GetAuthenticationStateAsync()' method in the Authentication state provider returns user AuthenticationState. The 'NotifyAuthenticationStateChaged()' to notify the latest user information within the components which using this AuthenticationStateProvider. AuthorizeView Component - displays different content depending on the user authorization state. This component uses the AuthenticationStateProvider

How Response Caching Works In Asp.Net Core

What Is Response Caching?: Response Caching means storing of response output and using stored response until it's under it's the expiration time. Response Caching approach cuts down some requests to the server and also reduces some workload on the server. Response Caching Headers: Response Caching carried out by the few Http based headers information between client and server. Main Response Caching Headers are like below Cache-Control Pragma Vary Cache-Control Header: Cache-Control header is the main header type for the response caching. Cache-Control will be decorated with the following directives. public - this directive indicates any cache may store the response. private - this directive allows to store response with respect to a single user and can't be stored with shared cache stores. max-age - this directive represents a time to hold a response in the cache. no-cache - this directive represents no storing of response and always fetch the fr

Part-1 Angular JWT Authentication Using HTTP Only Cookie[Angular V13]

In this article, we are going to implement a sample angular application authentication using HTTP only cookie that contains a JWT token. HTTP Only JWT Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-Only cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use authentication with HTTP only JWT cookie then we no need to implement custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To implement JWT cookie authentication we need to set up an API. For that, I had created a mock authentication API(Using the NestJS Se

Unit Testing Asp.NetCore Web API Using xUnit[.NET6]

In this article, we are going to write test cases to an Asp.NetCore Web API(.NET6) application using the xUnit. xUnit For .NET: The xUnit for .Net is a free, open-source, community-focused unit testing tool for .NET applications. By default .Net also provides a xUnit project template to implement test cases. Unit test cases build upon the 'AAA' formula that means 'Arrange', 'Act' and 'Assert' Arrange - Declaring variables, objects, instantiating mocks, etc. Act - Calling or invoking the method that needs to be tested. Assert - The assert ensures that code behaves as expected means yielding expected output. Create An API And Unit Test Projects: Let's create a .Net6 Web API and xUnit sample applications to accomplish our demo. We can use either Visual Studio 2022 or Visual Studio Code(using .NET CLI commands) to create any.Net6 application. For this demo, I'm using the 'Visual Studio Code'(using the .NET CLI command) editor. Create a fo

Different HttpClient Techniques To Consume API Calls In Minimal API[.NET6]

In this article, we are going to implement different HttpClient techniques to consume API calls in minimal API. The different HttpClient techniques that we are going to explore are like: Register HttpClient Object Explicitly In DI(Dependency Injection Service) Named Client Type Client HttpRequestMessage Object Create A .NET6 Minimal API Project: Let's create a .Net6 Minimal API sample project to accomplish our demo. We can use either Visual Studio 2022 or Visual Studio Code(using .NET CLI commands) to create any.Net6 application. For this demo, I'm using the 'Visual Studio Code'(using the .NET CLI command) editor. CLI command For Minimal API Project dotnet new webapi -minimal -o Your_Project_Name Create A Third Party API Response Model: Here I'm going to use a free third-party rest API that is "https://jsonplaceholder.typicode.com/posts". So to receive the response let's create a response model like 'Post.cs'. Program.cs:(Add Post.cs c

.Net5 Web API Managing Files Using Azure Blob Storage

In this article, we are going to understand the different file operations like uploading, reading, downloading, and deleting in .Net5 Web API application using Azure Blob Storage. Azure Blob Storage: Azure blob storage is Microsoft cloud storage. Blob storage can store a massive amount of file data as unstructured data. The unstructured data means not belong to any specific type, which means text or binary data. So something like images or pdf or videos to store in the cloud, then the most recommended is to use the blob store. The key component to creating azure blob storage resource: Storage Account:- A Storage account gives a unique namespace in Azure for all the data we will save. Every object that we store in Azure Storage has an address. The address is nothing but the unique name of our Storage Account name. The combination of the account name and the Azure Storage blob endpoint forms the base address for each object in our Storage account. For example, if our Storage Account is n

.Net5 Web API Redis Cache Using StackExchange.Redis.Extensions.AspNetCore Library

In this article, we are going to explore the integration of Redis cache in .Net5 Web API application using the 'StackExchange.Redis.Exntensions' library. Note:- Microsoft has introduced an 'IDistributedCache' interface in dotnet core which supports different cache stores like In-Memory, Redis, NCache, etc. It is simple and easy to work with  'IDistributedCache', for the Redis store with limited features but if we want more features of the Redis store we can choose to use 'StackExchange.Redis.Extensions'.  Click here for Redis Cache Integration Using IDistributedCache Interface . Overview On StackExchange.Redis.Extnesions Library: The 'StackExchange.Redis.Extension' library extended from the main library 'StackExchange.Redis'. Some of the key features of this library like: Default serialization and deserialization. Easy to save and fetch complex objects. Search key. Multiple Database Access Setup Redis Docker Instance: For this sampl