Skip to main content

Part-4 Asynchronous Data Communication Between Microservices Using RabbitMQ Message Broker With MassTransit[.NET6 Microservice Series]

In this article, we are going to establish asynchronous data communication between microservice using the RabbitMQ message broker along with MassTransit.

Now let's take look at the 'Orders' GET endpoint response.
Here we can understand we don't have 'Product' information. So we have to make a way to save the required 'Product' information into the 'SalesBusiness.API' microservice application from the 'Manufacture.API' microservice application.

Steps we are going to accomplish this demo are:
  • Configure RabbitMQ message broker with MassTransit for asynchronous data communication between the microservices.
  • In the 'Manufacture' database we have a 'Products' table(like Master table), now we will create a new 'Products' table(like child table) in 'SalesDatabase', but here we will add only required columns. So that while fetching the 'Orders' endpoint it is to fetch required products information.
  • So with help of RabbitMQ, we always maintain consistent or exact data between both parent and child tables.

Part-3 Create A Microservice For The Orders Endpoint[.NET6 Microservice Series]


RabbitMQ is a message broker that helps to share data asynchronously between the publishers and receivers.
RabbitMQ exchange flow:
  • RabbitMQ supports different exchange types like 'Direct', 'Topic', 'Headers', 'Fanout'. By default RabitMQ uses 'Fanout' exchange, we also use this fanout exchange for our demo.
  • In Fanout exchange when a producer sends a message to RabbitMQ then the message will be sent to the 'FanoutExchange' and then the from the fanout exchange messages will be delivered to all queues then from the queued messages will be received by the respective receivers.


MassTransit is free open-source software that can wrap around message brokers like 'RabbitMQ', 'Azure Service Bus', 'SQS', 'ActiveMQ Service Bus' etc.

MassTransit provides an easy way to configure the message brokers in our .NET applications.

RabbitMQ Docker-Compose:

Install Docker Desktop into your local machine.(

Now create a folder of any name then in that folder add the 'docker-compose.yaml' file. Then add the following docker-compose configurations.
version: '3.8'

    image: rabbitmq:3-management
      - 15672:15672
      - 4001:5672
    container_name: myrabbitcontainer
  • (Line: 1) The 'version' is the docker-compose document version.
  • (Line: 3) The 'services' represents software or service we are going to run on docker.
  • (Line: 4) The 'rabbitmq' custom name of our service, is recommended giving a meaningful name that represents the service.
  • (Line: 5) Docker image of 'RabbitMQ'.
  • (Line: 7) The right-hand side port '15672' runs the RabbitMQ admin tool.
  • (Line: 8) The right-hand side port '5672' runs the RabbitMQ service.
  • (Line: 9) Name of the container to run our 'RabbitMQ' image.
Command to run the docker container.
docker-compose up -d

Create A Shared Library Contains DTOs For RabbitMQ Messages:

Our two microservices in which one will act as publisher and another will act as a consumer using the RabbitMQ message broker. So the 'Type' of the message will be a model(POCO class) that will be created in 'Shared Library' that will be consumed by both our Microservices applications.

Now create a .NET Library.
CLI command
dotnet new classlib -o Your_Project_Name

Note: Now add the newly created class library reference into both microservice applications.

Now in the shared library let's add a new folder like 'RabbitMqModels', next add a model(POCO class) like 'ProductCreated'(this type of our message shared by RabbitMq).
namespace Shared.Models.RabbitMqModel;

public class ProductCreated
    public int Id { get; set; }
    public string Name { get; set; }

Install NuGet Packges:

To configure RabbitMQ and MassTransit we have to install the following NuGet Package into our both microservice applications.

Install 'MassTransit' NuGet Package.
CLI command
dotnet add package MassTransit --version 8.0.0

Package Manager
Install-Package MassTransit -Version 8.0.0

Install 'MassTransit.AspNetCore' NuGet Package.
CLI command
dotnet add package MassTransit.AspNetCore --version 7.3.1

Package Manager
Install-Package MassTransit.AspNetCore -Version 7.3.1

Install 'MassTransit.RabbitMQ'
CLI command
dotnet add package MassTransit.RabbitMQ --version 8.0.0

Package Manager
Install-Package MassTransit.RabbitMQ -Version 8.0.0


Our 'Manufacturer.API' microservice application will be the publisher.

Configure 'MassTransit' and 'RabbitMQ' services in to the 'Program.cs'.
builder.Services.AddMassTransit(options => {
    options.UsingRabbitMq((context,cfg) => {
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {

  • (Line: 1) Registered the 'MassTransit' service.
  • (Line: 2) The 'RabbitMQ' service is configured inside of the 'MassTransit' service.
  • (Line: 3) Defined our 'RabbitMQ' host. Here port '4001' is my custom port exposed from the docker container.
  • (Line: 4&5) The default username and password for 'RabbitMQ' are 'guest'.
Inside of the 'ProductController' constructor let's inject the 'MassTransit.IPublishEndpoint'
using MassTransit;

// existing code hidden for display purpose

public class ProductsController : ControllerBase
    private readonly IPublishEndpoint _publishEndpoint;
    public ProductsController(
    IPublishEndpoint publishEndpoint)
        _publishEndpoint = publishEndpoint;
Now let's implement publish logic in our 'PostAsync' action method.
using Shared.Models.RabbitMqModel;

// existing code hidden for display purpose

public async Task<IActionResult> PostAsync(Products newProduct)
	await _manufactureContext.SaveChangesAsync();
	await _publishEndpoint.Publish<ProductCreated>(new ProductCreated
		Id = newProduct.Id,
		Name = newProduct.Name

	return CreatedAtAction("Get", new { id = newProduct.Id }, newProduct);
  • (Line: 10-14) The 'IPublishEndpoint.Publish<T>()' method pushes the message into the 'Fanout Exchange' of RabbitMQ. The message of type is 'Shared.Models.RabbitMqMode.ProductCreated'.

Create A Child Products Table In SalesBusiness Database:

Let's create a child 'Products' table in the 'SalesBusiness' database.
CREATE TABLE [dbo].[Products](
	[Id] [int] NOT NULL,
	[Name] [varchar](500) NULL,
	[Id] ASC

Configure Products Table Entity In SalesBusiness.API Project:

Let's add the 'Product' entity in our 'SalesBusiness.API' project
namespace SalesBusiness.Api.Data.Entities;

public class Products
    public int Id { get; set; }
    public string Name { get; set; }
Now register the 'Products' inside of the 'SalesBusinessContext'.
using Microsoft.EntityFrameworkCore;
using SalesBusiness.Api.Data.Entities;

namespace SalesBusiness.Api.Data;
public class SalesBusinessContext: DbContext
    public SalesBusinessContext(DbContextOptions<SalesBusinessContext> options):base(options)
    public DbSet<Orders> Orders{get;set;}
    public DbSet<Products> Products{get;set;}


Our 'SalesBusiness.API' microservice application will be the consumer.

Using 'MassTransit.IConsumer<T>' we will make a RabbitMQ queue consumer. So let's create a folder like 'Consumer' and then add a class inside of it like 'ProductCreatedConsumer.cs'.
using MassTransit;
using SalesBusiness.Api.Data;
using SalesBusiness.Api.Data.Entities;
using Shared.Models.RabbitMqModel;

namespace SalesBusiness.Api.Consumer;

public class ProductCreatedConsumer : IConsumer<ProductCreated>
    private readonly SalesBusinessContext _salesBusinessContext;
    public ProductCreatedConsumer(SalesBusinessContext salesBusinessContext)
        _salesBusinessContext = salesBusinessContext;
    public async Task Consume(ConsumeContext<ProductCreated> context)
        var newProduct = new Products{
            Id = context.Message.Id,
            Name = context.Message.Name
        await _salesBusinessContext.SaveChangesAsync();
  • To make 'ProductCreatedConsumer' entity as a RabbitMQ queue consumer it must inherit the 'MassTransit.IConsumer'.
  • (Line: 15-23) Here implemented 'Consume' asynchronous method that's gets executed on every new message received by the queue. Inside of this method, our logic to store the message data has been implemented.
In 'Program.cs' let's register 'MassTransit' and 'RabbitMQ' services.
builder.Services.AddMassTransit(x => {
    x.UsingRabbitMq((context, cfg) =>
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {
        cfg.ReceiveEndpoint("event-listener", e =>

  • (Line: 9-12) Here defined our channel or queue name like 'event-listener'. So 'RabbitMQ' Fanout exchange pushes messages to all queues. Here register our channel entity that is 'ProductCreatedConsumer' which listens for every new message from the RabbitMQ queue.

Test Microservices Communication With RabbitMQ Message Broker:

Make sure our microservice application running under different port numbers. To change port number
goto 'Properties/launchsettings.json'.

Make sure to run the Docker desktop application on our local machine and then run our docker-compose for 'RabbitMQ' as explained above.

Let's try to create a new product from our Product Endpoint(Manufacture.API microservice application). In the action method we are publishing our new product record as a message to the RabbitMQ queue channel, so our 'SalesBusiness.API' project act as a consumer and reads the message and saves the message into our child 'Products' table in the 'SalesBusiness' database. This step represents asynchronous communication between our microservices.

Now we can observe data between '[Manufacture].[dbo].[Products]' and '[SalesBusiness].[dbo].[Products]' these 2 tables are synced because of RabbitMQ asynchronous communication between the microservices.

Update Orders GET Endpoint To Fetch Products Information:

First, let's create DTO for our 'Orders' like 'Orders.Dto' that contains 'Prodcut.Dto' as one of its properties.
namespace SalesBusiness.Api.DTOs;

public class OrdersDto
    public int Id { get; set; }
    public string UserId { get; set; }
    public DateTime? OrderDate { get; set; }
    public ProductDto ProductInfo { get; set; }

public class ProductDto
    public int Id { get; set; }
    public string Name { get; set; }
Now let's update our Orders GET endpoint to fetch the 'Product' information along with the 'Orders'.
public async Task<IActionResult> GetAsync()
	var orders = await _salesBusinessContext.Orders
		order => order.ProductId,
		product => product.Id,
		(order, product) => new { Order = order, Product = product }
	.Select(_ => new OrdersDto
		Id = _.Order.Id,
		OrderDate = _.Order.OrderDate,
		UserId = _.Order.UserId,
		ProductInfo = new ProductDto
			Id = _.Product.Id,
			Name = _.Product.Name
	return Ok(orders);
  • Here 'Orders' table is joined with the 'Product' table while fetching the collection of 'Orders'.
In the next article, we are going to implement the 'API Gateway'.

Support Me!
Buy Me A Coffee PayPal Me

Video Session:

Wrapping Up:

Hopefully, I think this article delivered some useful information Microservice flow in .NET6 applications. using I love to have your feedback, suggestions, and better techniques in the comment section below.



Post a Comment

Popular posts from this blog

.NET6 Web API CRUD Operation With Entity Framework Core

In this article, we are going to do a small demo on AspNetCore 6 Web API CRUD operations. What Is Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains a programming function that can be requested via HTTP calls to save or fetch the data for their respective clients. Some of the key characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build REST full services. Create A .NET6 Web API Application: Let's create a .Net6 Web API sample application to accomplish our

Angular 14 Reactive Forms Example

In this article, we will explore the Angular(14) reactive forms with an example. Reactive Forms: Angular reactive forms support model-driven techniques to handle the form's input values. The reactive forms state is immutable, any form filed change creates a new state for the form. Reactive forms are built around observable streams, where form inputs and values are provided as streams of input values, which can be accessed synchronously. Some key notations that involve in reactive forms are like: FormControl - each input element in the form is 'FormControl'. The 'FormControl' tracks the value and validation status of form fields. FormGroup - Track the value and validate the state of the group of 'FormControl'. FormBuilder - Angular service which can be used to create the 'FormGroup' or FormControl instance quickly. Form Array - That can hold infinite form control, this helps to create dynamic forms. Create An Angular(14) Application: Let'

Part-1 Angular JWT Authentication Using HTTP Only Cookie[Angular V13]

In this article, we are going to implement a sample angular application authentication using HTTP only cookie that contains a JWT token. HTTP Only JWT Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-Only cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use authentication with HTTP only JWT cookie then we no need to implement custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To implement JWT cookie authentication we need to set up an API. For that, I had created a mock authentication API(Using the NestJS Se

Unit Testing Asp.NetCore Web API Using xUnit[.NET6]

In this article, we are going to write test cases to an Asp.NetCore Web API(.NET6) application using the xUnit. xUnit For .NET: The xUnit for .Net is a free, open-source, community-focused unit testing tool for .NET applications. By default .Net also provides a xUnit project template to implement test cases. Unit test cases build upon the 'AAA' formula that means 'Arrange', 'Act' and 'Assert' Arrange - Declaring variables, objects, instantiating mocks, etc. Act - Calling or invoking the method that needs to be tested. Assert - The assert ensures that code behaves as expected means yielding expected output. Create An API And Unit Test Projects: Let's create a .Net6 Web API and xUnit sample applications to accomplish our demo. We can use either Visual Studio 2022 or Visual Studio Code(using .NET CLI commands) to create any.Net6 application. For this demo, I'm using the 'Visual Studio Code'(using the .NET CLI command) editor. Create a fo

A Small Guide On NestJS Queues

NestJS Application Queues helps to deal with application scaling and performance challenges. When To Use Queues?: API request that mostly involves in time taking operations like CPU bound operation, doing them synchronously which will result in thread blocking. So to avoid these issues, it is an appropriate way to make the CPU-bound operation separate background job.  In nestjs one of the best solutions for these kinds of tasks is to implement the Queues. For queueing mechanism in the nestjs application most recommended library is '@nestjs/bull'(Bull is nodejs queue library). The 'Bull' depends on Redis cache for data storage like a job. So in this queueing technique, we will create services like 'Producer' and 'Consumer'. The 'Producer' is used to push our jobs into the Redis stores. The consumer will read those jobs(eg: CPU Bound Operations) and process them. So by using this queues technique user requests processed very fastly because actually

Usage Of CancellationToken In Asp.Net Core Applications

When To Use CancellationToken?: In a web application request abortion or orphan, requests are quite common. On users disconnected by network interruption or navigating between multiple pages before proper response or closing of the browser, tabs make the request aborted or orphan. An orphan request can't deliver a response to the client, but it will execute all steps(like database calls, HTTP calls, etc) at the server. Complete execution of an orphan request at the server might not be a problem generally if at all requests need to work on time taking a job at the server in those cases might be nice to terminate the execution immediately. So CancellationToken can be used to terminate a request execution at the server immediately once the request is aborted or orphan. Here we are going to see some sample code snippets about implementing a CancellationToken for Entity FrameworkCore, Dapper ORM, and HttpClient calls in Asp.NetCore MVC application. Note: The sample codes I will show in

Blazor WebAssembly Custom Authentication From Scratch

In this article, we are going to explore and implement custom authentication from the scratch. In this sample, we will use JWT authentication for user authentication. Main Building Blocks Of Blazor WebAssembly Authentication: The core concepts of blazor webassembly authentication are: AuthenticationStateProvider Service AuthorizeView Component Task<AuthenticationState> Cascading Property CascadingAuthenticationState Component AuthorizeRouteView Component AuthenticationStateProvider Service - this provider holds the authentication information about the login user. The 'GetAuthenticationStateAsync()' method in the Authentication state provider returns user AuthenticationState. The 'NotifyAuthenticationStateChaged()' to notify the latest user information within the components which using this AuthenticationStateProvider. AuthorizeView Component - displays different content depending on the user authorization state. This component uses the AuthenticationStateProvider

How Response Caching Works In Asp.Net Core

What Is Response Caching?: Response Caching means storing of response output and using stored response until it's under it's the expiration time. Response Caching approach cuts down some requests to the server and also reduces some workload on the server. Response Caching Headers: Response Caching carried out by the few Http based headers information between client and server. Main Response Caching Headers are like below Cache-Control Pragma Vary Cache-Control Header: Cache-Control header is the main header type for the response caching. Cache-Control will be decorated with the following directives. public - this directive indicates any cache may store the response. private - this directive allows to store response with respect to a single user and can't be stored with shared cache stores. max-age - this directive represents a time to hold a response in the cache. no-cache - this directive represents no storing of response and always fetch the fr

Angular 14 State Management CRUD Example With NgRx(14)

In this article, we are going to implement the Angular(14) state management CRUD example with NgRx(14) NgRx Store For State Management: In an angular application to share consistent data between multiple components, we use NgRx state management. Using NgRx state helps to avoid unwanted API calls, easy to maintain consistent data, etc. The main building blocks for the NgRx store are: Actions - NgRx actions represents event to trigger the reducers to save the data into the stores. Reducer - Reducer's pure function, which is used to create a new state on data change. Store - The store is the model or entity that holds the data. Selector - Selector to fetch the slices of data from the store to angular components. Effects - Effects deals with external network calls like API. The effect gets executed based the action performed Ngrx State Management flow: The angular component needs data for binding.  So angular component calls an action that is responsible for invoking the API call.  Aft

Angular 14 Crud Example

In this article, we will implement CRUD operation in the Angular 14 application. Angular: Angular is a framework that can be used to build a single-page application. Angular applications are built with components that make our code simple and clean. Angular components compose of 3 files like TypeScript File(*.ts), Html File(*.html), CSS File(*.cs) Components typescript file and HTML file support 2-way binding which means data flow is bi-directional Component typescript file listens for all HTML events from the HTML file. Create Angular(14) Application: Let's create an Angular(14) application to begin our sample. Make sure to install the Angular CLI tool into our local machine because it provides easy CLI commands to play with the angular application. Command To Install Angular CLI npm install -g @angular/cli Run the below command to create the angular application. Command To Create Angular Application ng new name_of_your_app Note: While creating the app, you will see a noti