Skip to content

Queuing system

CoreConnect architecture relies on a queuing system shared by the Functions and Gateway services. This allows for a high level of scalability and reliability. During CoreConnect installation, you will have to choose one of the following plugins based on your infrastructure:

Azure Service Bus

Pluging for integrating with Azure Service Bus.

Name CoreConnect.Queue.AzureServiceBus
Choice true
Configuration name AzureServiceBus
Configuration
{
  "ConnectionString": ""
}

RabbitMQ

Plugin for integrating with RabbitMQ.

Name CoreConnect.Queue.RabbitMQ
Choice true
Configuration name RabbitMQ
Configuration
{
  "Host": "localhost",
  "VirtualHost": "",
  "UserName": "guest",
  "Password": "guest"
}

Technical details

The queuing system provides a robust solution for processing messages in the correct sequence, safeguarding against message loss, buffering against traffic surges, and facilitating effortless horizontal scalability. On CoreConnect’s architecture, messages are typically sent to the queue by the Gateway (or other external services) and are processed by the Functions service.

Here’s an example of how you can make use of the queuing system in your code:

On the Functions service you can create and activate a custom plugin (see Custom Plugins) that inherits from CoreConnect.Plugins.CoreConnectPlugin. By overriding the RegisterQueues method, you can register the queues that your service will be listening to, and the consumer class (MassTransit.IConsumer) that will process the messages.

using CoreConnect.Plugins;
public class MyCustomPlugin : CoreConnectPlugin
{
public override string Name => "MyCustomPlugin";
public override string ConfigurationName => "MyCustomPluginSettings";
public override string Description => "Custom plugin for CoreConnect";
...
public override void RegisterQueues(ILogger logger, IQueueRegistration registration)
{
registration.RegisterQueue<AccountRegisterEmailMessage, AccountRegisterEmailConsumer>("account-register-email");
}
}
public record AccountRegisterEmailConsumer : IConsumer<AccountRegisterEmailMessage>
{
private readonly IEmailProvider _emailService;
private readonly ILogger<AccountRegisterEmailConsumer> _logger;
public AccountRegisterEmailConsumer(ILogger<AccountRegisterEmailConsumer> logger, IEmailProvider emailService)
{
_logger = logger;
_emailService = emailService;
}
public async Task Consume(ConsumeContext<AccountRegisterEmailMessage> context)
{
_logger.LogInformation(string.Concat(context.Message.Email,
context.Message.Locale,
context.Message.EmailToken));
_emailService.SendEmailAsync(...);
}
public class AccountRegisterEmailMessage
{
public string? Email { get; set; }
public string? Locale { get; set; }
public string? CountryCode { get; set; }
public string? EmailToken { get; set; }
}
}

In this example, the RegisterQueues method registers the queue account-register-email with its consumer (MassTransit.IConsumer) class AccountRegisterEmailConsumer that will process the messages of type AccountRegisterEmailMessage. If you now wish to send a message to the queue from the Gateway service you can do so by injecting the MassTransit.IBus service and publishing the message:

using MassTransit;
using CoreConnect.Commerce.Customer;
public async Task<SignUpResponse?> SignUp([Service] IRequestContext requestContext, [Service] ICustomerService customerService,
[Service] IBus bus, SignUpRequest request, CancellationToken cancellation)
{
var response = await customerService.SignUp(request, requestContext, cancellation);
...
AccountRegisterEmailMessage message = new()
{
CountryCode = requestContext.CountryCode,
Locale = requestContext.Locale,
Email = response.Customer.Email,
EmailToken = emailToken ?? string.Empty
};
await bus.Publish(message, cancellation);
return response;
}