Setting up MassTransit with .Net Core 3, AutoFac & RabbitMq

Masstransit logo

In this post, I want to set up MassTransit with RabbitMq and .Net Core 3. My application already uses AutoFac as a DI container, that I will take advantage of in this post. All the information to get MassTransit setup is out there but it is spread across the documentation, GitHub issues, StackOverflow and all the other usual places. I want to bring the information I needed to get it working together in one place.

There will be 2 services that will communicate together across the bus. The 1st will be an API, and the Second will be a Service for application logic, working with a DB or whatever you want it to do.

We will set up a Ping Controller in the API that will dispatch a command to the service via MassTransit. The Service will then take the command off the bus and consume it, before publishing an Event indicating that the Ping was consumed. The API will subscribe to this Event and listen for it before returning a synchronous result that includes the time that the process took.

Setup

Looking back at it there isn’t a lot involved to configure a basic setup of MassTransit. There was only one gotcha, that probably should be included in the .Net core setup guide but as usual, the answer was found on StackOverflow.

For .Net Core, there are a few ways of configuring MassTransit. I went with configuring it in an Autofac module. I liked this approach because

  1. I’m using Autofac
  2. It keeps my Setup.cs file cleaner
  3. It’s easy to swap out the module, I’ll talk more about this later.

There were 2 changes that I needed to make to my Startup.cs. Let’s start with the gotcha. At the time of writing, this wasn’t documented on the MassTransit page. I found the answer here on StackOverflow. This is to add the MassTransit Hosted Services

public void ConfigureServices(IServiceCollection services)
{
  ...
  services.AddMassTransitHostedService();
}

I did a little digging through the source code, and I think I found a brief reference in the MassTransit docs, although it doesn’t explicitly mention this method. But this seems to do some health checks on the bus and start the bus if it isn’t already running.

The 2nd change I made was to add ConfigureContainer to Startup.cs. This allows us to configure the bus when AutoFac is loaded, during the application startup. This is documented pretty well in the AutoFac docs.

public void ConfigureContainer(ContainerBuilder builder)
{
  builder.RegisterModule(new AutofacModule());
  builder.RegisterModule<BusModule>();
  // builder.RegisterModule<ConsumerModule>();
}

This code registers Autofac as my DI container and then loads the contents of the BusModule which is where my Bus is configured. You will notice that I have a commented-out ConsumerModule that I wanted to use to register consumers. Again there was documentation that suggested that this would be possible, but when I tried it out there were errors in the code sample from the docs. Although in the code sample there was a comment saying that they weren’t sure if this was possible.

In the end, I configured the consumers in the Bus configuration. I’ll show how I did this later.

Configuring the Bus

I configured the Bus in an AutoFac module. Inside the module, there are 4 parts to configuring MassTransit.

  1. Add MassTransit to the ContainerBuilder
  2. Add a Bus to MassTransit
  3. Configure the host
  4. Configure the ReceiveEndpoint

The code for this looks like the following.

public class BusModule : Module
  {
    protected override void Load(ContainerBuilder builder)
    {
      var config = ConfigurationHelper.GetConfiguration();
      var serviceBusHost = config.GetConnectionString(BusSettings.ServiceBusConnection);
      var massTransitConfig = config.GetSection(BusSettings.MassTransitConfig);
      var hostUri = new Uri(serviceBusHost);

      builder.AddMassTransit(x =>
      {
        // Add Consumers
        x.AddConsumer<EventsPingConsumer>();

        // Bus Configuration
        x.AddBus(context =>
        {
          var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
          {
            cfg.Host(hostUri, host =>
            {
              host.Username(massTransitConfig[BusSettings.UserName]);
              host.Password(massTransitConfig[BusSettings.Password]);
            });

            cfg.ReceiveEndpoint(BusSettings.EventsService,
              ep =>
              {
                // Configure Endpoint
                ep.PrefetchCount = 16;
                ep.UseMessageRetry(r => r.Interval(2, 100));

                // Configure Consumers
                ep.ConfigureConsumer<EventsPingConsumer>(context);
              });
          });

          return bus;
        });
      });
    }
  }

The first part AddMassTransit does what it says and adds MassTransit to the builder. Inside this, we add a bus where we tell it what type of bus it will be. In this case, I used RabbitMq. After that, the host is configured with the URI and the user name & password needed to connect to it. Finally, we set the ReceiveEndpoint, where we can set the retry policy and Prefetch count among other config items. This is also the area where you add your consumers. I will talk about this a bit more later on in this post.

RabbitMq

I’m not going to go into a lot of detail on RabbitMq for 2 reasons.

  1. I don’t think I’ll use it in production
  2. I don’t know much about it.

The reason I’m don’t think I’ll use it in production is that I’m a solo developer with a small budget, so I want to keep infrastructure maintenance & cost to a minimum so cheap PAAS is my preference for everything. I will probably switch to Azure Service Bus, but since I can’t run that locally I am using RabbitMq for development.

I have it running in a Docker container but I also tried out CloudAMQP. CloudAMQP is really easy to set up and there is a free development environment option available I would really recommend it if you want to keep it online.

If you are using Docker make sure you use the image that has the RabbitMq Management Portal setup. It is useful for confirming your setup and testing that messages and events are getting put in the correct queue.

Here is my config for RAbbitMq in Docker-Compose.

rabbitmq:
    image: "rabbitmq:3-management-alpine"
    environment:
      RABBITMQ_DEFAULT_USER: "appUser"
      RABBITMQ_DEFAULT_PASS: "myBigDevPassword!"
      RABBITMQ_ERLANG_COOKIE: "secret_cookie"
    volumes:
      - rabbitmq-volume/:/var/lib/rabbitmq/
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      - backend
...
volumes:
  ...
  rabbitmq-volume:

There are a few things that you should check through the portal;

  1. That your application is created on the exchange
  2. That the Exchange is bound to the correct queue

Exchange

MassTransit will create the exchanges, queues and events and bind them all together based on your bus config in MassTransit. Here you will see exchanges for each queue that is specified as a receiving endpoint and each of the events.

Binding

These are the bindings for my API. This screen shows that the api is subscribed to the EventsPingCompleted event and listens to the events-api queue.

Send Command

Register Endpoints – Before a command can be sent it needs to associated with the endpoint that it is to be sent to. According to the MassTransit documentation, this can be done anywhere in the project.

At this time I put it in a dependency injection module for Autofac and then load it in the ConfigureContainer method of StartUp.cs after the BusModule is loaded. This is easily done with the following code. I’m not sure if this is the best way of doing it but it works.

public class EndpointConventionModule: Module
{
  protected override void Load(ContainerBuilder builder)
  {
    var config = ConfigurationHelper.GetConfiguration();
    var host = new Uri(config.GetConnectionString(BusSettings.ServiceBusConnection));

    EndpointConvention.Map<EventsPing>(new Uri(host, BusSettings.EventsService));
  }
}

This will globally configure the endpoint for this type of command, so when you send a command you don’t need to specify the endpoint.

Send Commands – Two of the ways you can send commands are, Send which is asynchronous and Request which is kind of synchronous where you specify the type you are sending and a response type that can be waited on.

await _bus.Send(ping).ConfigureAwait(false);

var response = await _bus.Request<EventsPing, EventsPingCompleted>(ping).ConfigureAwait(false);

Consume Command

Once the command has been created it needs to be consumed by the consuming service. To do this create a Consumer that inherits from IConsumer for the type that you want to handle. Mine looks like this

public class EventsPingConsumer : IConsumer<EventsPing>
{
  private readonly ILogger<EventsPingConsumer> _logger;
  public EventsPingConsumer(ILogger<EventsPingConsumer> logger)
  {
    _logger = logger;
  }

  public async Task Consume(ConsumeContext<EventsPing> context)
  {
    // log
    _logger.LogInformation("Ping command received");

    // publish PingCompleted
    var pingCompleted = new EventsPingCompleted();
    await context.RespondAsync(pingCompleted).ConfigureAwait(false);
  }
}

Inside the Consumer, you will have the logic you want to perform when the command or event is received.

When the Consumer is created it needs to be registered with MassTransit. This is done where you add Masstransit to the pipeline in Startup.cs. In this case, it is in the BusModule Autofac container. Inside the function for AddMassTransit add the Consumer.

builder.AddMassTransit(x =>
{
    // Add Consumers
    x.AddConsumer<EventsPingConsumer>();

    // Bus Configuration
    x.AddBus(context =>
    {
      ...
    }
  ...
}

The final step is to register the Consumer. Basically what is happening here is that we add it to the correct Endpoint so that the Consumer is listening in the correct place for the relevant command or event.

This happens inside the Bus Factory, in the function for ReceiveEndpoint .

cfg.ReceiveEndpoint(BusSettings.EventsService, ep =>
  {
    // Configure Endpoint
    ...
    // Configure Consumers
    ep.ConfigureConsumer<EventsPingConsumer>(context);
  });

Publish Event

Events are published to notify that something has happened in a system. This is easily done using the Publish method.

// publish PingCompleted
var pingCompleted = new EventsPingCompleted();
await context.Publish(pingCompleted);

Request/Response

Request/Response is useful when you need to know when an event has been completed and you are happy to wait for it.

I touched on this when I was talking about the Send Commands. There is the request from the Producer that sends the command and then awaits a response of a given Event or Command back.

var response = await _bus.Request<EventsPing, EventsPingCompleted>(ping).ConfigureAwait(false);

The response from the Consumer is.

var pingCompleted = new EventsPingCompleted();
await context.RespondAsync(pingCompleted).ConfigureAwait(false);

This uses the MessageId in the Context to respond to the Producers Request with the EventsPingCompleted event

Conclusion

MassTransit is pretty easy to use and work with. My biggest challenge was finding the required information to get it working 100%. I’m happy I chose this as my messaging service for my project. Previously I had used NServiceBus but felt it didn’t suit this project because of the eventual licencing cost and that NServiceBus’ features are getting pretty complicated and I feel that it is becoming more than a service bus.

Leave a Comment

Your email address will not be published. Required fields are marked *