Опыт использования MassTransit 3.0
MassTransit это open source библиотека, разработанная на языке C# для .NET платформы, упрощающая работу с шиной данных, которая используется при построении распределенных приложений и реализации SOA (service oriented architecture).
В качестве message broker могут выступать RabbitMq, Azure Service Bus или In-Memory менеджер (в случае с In-Memory область видимости ограничивается процессом, в котором проинициализирован экземпляр).
Содержание:
- Команды и события
- Контракты сообщений
- Роутинг
- Консьюмеры (Consumer)
- Конфигурация контейнера DI
- Наблюдатели (Observer)
- Новое в MassTransit 3.0
- Заключение
Команды и события
В библиотеке заложено 2 основных типа сообщений: команды и события.
Команды
Сигнализируют о необходимости выполнить некое действие. Для наиболее содержательного наименования команды желательно использовать структуру глагол + существительное: EstimateConnection, SendSms, NotifyCustomerOrderProcessed.
Работа с командами осуществляется с помощью метода Send (интерфейса ISendEndpoint) и указания получателя endpoint (очереди):
private static async Task SendSmsCommand(IBus busControl)
{
var command = new SendSmsCommand
{
CommandId = Guid.NewGuid(),
PhoneNumber = "89031112233",
Message = "Thank you for your reply"
};
var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint);
await endpoint.Send(command);
}
События
Сигнализируют о случившемся событии, которое может быть интересно некоему набору подписчиков (паттерн Observer), которые на эти события реагируют, например: ConnectionEstimated, CallTerminated, SmsSent, CustomerNotified.
Работа с событиями осуществляется с помощью метода Publish (интерфейса IPublishEndpoint).
В терминологии заложено и основное различие этих типов сообщений — команда доставляется единственному исполнителю (дабы избежать дублирования выполнения):
Изображение из статьи MassTransit Send vs. Publish
В то время как событие ориентировано на оповещение n-подписчиков, каждый из которых реагирует на случившееся событие по-своему.
Изображение из статьи MassTransit Send vs. Publish
Иными словами, при запущенных n-консьюмеров (от англ. consumer — потребитель, обработчик), обрабатывающих команду, после её публикации только один из них получит сообщение о ней, в то время как сообщение о событии получит каждый.
Контракты сообщений
Согласно документации MassTransit, при объявлении контрактов сообщений рекомендуется прибегать к интерфейсам:
public interface ISendSms {
Guid CommandId { get; }
string PhoneNumber { get; }
string Message { get; }
}
Как упоминалось ранее, отправка команд должна осуществляться исключительно с помощью метода Send (интерфейса IBus) и указания адресата (endpoint).
public interface ISmsSent {
Guid EventId { get; }
DateTime SentAtUtc { get; }
}
События отправляются с помощью метода Publish.
Роутинг
Как распределение сообщений по exchange, так и выбор консьюмеров (о них в этой статье будет рассказано чуть позже) для обработки базируются на runtime типах этих сообщений,- в наименовании используются namespace и имя типа, в случае с generic имя родительского типа и перечень аргументов.
Exchange
При конфигурации receive endpoint‘a (подключении ранее зарегистрированных консьюмеров) в случае использования в качестве канала доставки сообщений RabbitMq на основании указанных к обработке консьюмерами типов сообщений формируются наименования требуемых exchange, в которые затем эти самые сообщения и будут помещаться.
Аналогичные действия на этапе конфигурации send endpoint‘a выполняются и для команд, для отправки которых также требуются собственные exchange.
На изображении можно увидеть созданные в рамках моего сценария exchange:
В том случае, если конфигурируя receive endpoint мы указываем наименование очереди:
cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
то в привязках exchange сообщений можно будет увидеть следующую картину:
Итоговый путь сообщения, тип которого имплементирует ISmsEvent, будет иметь следующий вид:
Если же конфигурация осуществляется без указания очереди:
cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container));
То имена для последнего exchange и очереди формируются автоматически, а по завершению работы они будут удалены:
Формат сообщения
Говоря о формате сообщения, хотелось бы подробнее остановиться на наименовании (или messageType). За его формирование (заголовков urn:message:) ответственна функция GetUrnForType(Type type). Для примера я добавил для команды ISendSms наследование от ICommand и generic тип:
public interface ICommand<T>
{
}
public interface ISendSms<T> : ICommand<T>
{
Guid CommandId { get; }
string PhoneNumber { get; }
string Message { get; }
}
class SendSmsCommand : ISendSms<string>
{
public Guid CommandId { get; set; }
public string PhoneNumber { get; set; }
public string Message { get; set; }
}
Сформированное сообщение в таком случае будет содержать следующее значение в поле messageType (на основании которого после получения сообщения затем и выбирается ответственный за обработку консьюмер):
"messageType": [
"urn:message:PlayWithMassTransit30.Extension:SendSmsCommand",
"urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]",
"urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]"
]
Кроме messageType сообщение содержит информацию о host, которым оно было отправлено:
"host": {
"machineName": "DESKTOP-SI9OHUR",
"processName": "PlayWithMassTransit30.vshost",
"processId": 1092,
"assembly": "PlayWithMassTransit30",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "4.0.30319.42000",
"massTransitVersion": "3.4.1.808",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
}
Значимую часть payload:
"message": {
"commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e",
"phoneNumber": "89031112233",
"message": "Thank you for your reply"
}
и иные служебные поля и заголовки.
Консьюмеры (Consumer)
Консьюмер — это класс, который обрабатывает один или несколько типов сообщений, указываемых при объявлении в наследовании интерфейса IConsumer, где T это тип обрабатываемого данным консьюмером сообщения.
Пример консьюмера, обрабатывающего команду ISendSms и публикующего событие ISmsSent:
public class SendSmsConsumer : IConsumer<ISendSms<string>>
{
public SendSmsConsumer(IBusControl busControl)
{
_busControl = busControl;
}
public async Task Consume(ConsumeContext<ISendSms<string>> context)
{
var message = context.Message;
Console.WriteLine($"[IConsumer<ISendSms>] Send sms command consumed");
Console.WriteLine($"[IConsumer<ISendSms>] CommandId: {message.CommandId}");
Console.WriteLine($"[IConsumer<ISendSms>] Phone number: {message.PhoneNumber}");
Console.WriteLine($"[IConsumer<ISendSms>] Message: {message.Message}");
Console.Write(Environment.NewLine);
Console.WriteLine("Публикация события: Смс сообщение отправлено");
await _busControl.SmsSent(DateTime.UtcNow);
}
private readonly IBus _busControl;
}
После того, как мы получили команду на отправку смс сообщения и выполнили требуемые действия, мы формируем и отправляем событие о том, что смс доставлено.
Код отправки сообщений я вынес в отдельный Extension класс над IBusControl, там же находится и имплементация самих сообщений:
public static class BusExtensions
{
/// <summary>
/// Отправка смс сообщения
/// </summary>
/// <param name="bus"></param>
/// <param name="host"></param>
/// <param name="phoneNumber"></param>
/// <param name="message"></param>
/// <returns></returns>
public static async Task SendSms(
this IBus bus, Uri host, string phoneNumber, string message
)
{
var command = new SendSmsCommand
{
CommandId = Guid.NewGuid(),
PhoneNumber = phoneNumber,
Message = message
};
await bus.SendCommand(host, command);
}
/// <summary>
/// Публикация события об отправке смс сообщения
/// </summary>
/// <param name="bus"></param>
/// <param name="sentAtUtc"></param>
/// <returns></returns>
public static async Task SmsSent(
this IBus bus, DateTime sentAtUtc
)
{
var @event = new SmsSentEvent
{
EventId = Guid.NewGuid(),
SentAtUtc = sentAtUtc
};
await bus.PublishEvent(@event);
}
/// <summary>
/// Отправка команды
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="bus"></param>
/// <param name="address"></param>
/// <param name="command"></param>
/// <returns></returns>
private static async Task SendCommand<T>(this IBus bus, Uri address, T command) where T : class
{
var endpoint = await bus.GetSendEndpoint(address);
await endpoint.Send(command);
}
/// <summary>
/// Публикация события
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="bus"></param>
/// <param name="message"></param>
/// <returns></returns>
private static async Task PublishEvent<T>(this IBus bus, T message) where T : class
{
await bus.Publish(message);
}
}
class SendSmsCommand : ISendSms<string>
{
public Guid CommandId { get; set; }
public string PhoneNumber { get; set; }
public string Message { get; set; }
}
class SmsSentEvent : ISmsSent
{
public Guid EventId { get; set; }
public DateTime SentAtUtc { get; set; }
}
На мой взгляд, данное решение вполне удачно позволяет отделить код бизнес-логики от деталей реализации межсистемного (компонентного) взаимодействия и инкапсулировать их в одном месте.
Конфигурация контейнера DI
На данный момент MassTransit предоставляет возможность использовать следующие популярные контейнеры:
- Autofac;
- Ninject;
- StructureMap;
- Unity;
- Castle Windsor.
В случае с UnityContainer потребуется установить nuget package MassTransit.Unity, после чего станет доступен метод расширения LoadFrom:
public static class UnityExtensions
{
public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container);
}
Пример использования выглядит следующим образом:
public static IBusControl GetConfiguredFactory(IUnityContainer container)
{
if (container == null)
{
throw new ArgumentNullException(nameof(container));
}
var control = Bus.Factory.CreateUsingRabbitMq(cfg => {
var host = cfg.Host(AppSettings.Host, h => { });
// cfg.ReceiveEndpoint(host, e => e.LoadFrom(container));
cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
});
control.ConnectConsumeObserver(new ConsumeObserver());
control.ConnectReceiveObserver(new ReceiveObserver());
control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand());
control.ConnectSendObserver(new SendObserver());
control.ConnectPublishObserver(new PublishObserver());
return control;
}
В качестве срока жизни консьюмеров в контейнере документация предлагает использовать ContainerControlledLifetimeManager().
Наблюдатели (Observer)
Для мониторинга процесса обработки сообщений доступно подключение наблюдателей (Observer). Для этого MassTransit предоставляет следующий набор интерфейсов для обработчиков:
- IReceiveObserver- срабатывает сразу же после получения сообщения и создания RecieveContext;
- IConsumeObserver — срабатывает после создания ConsumeContext;
- IConsumeMessageObserver
— для наблюдения за сообщениями типа T, в методах которого будет доступно строго-типизированное содержимое сообщения; - ISendObserver — для наблюдения за отправляемыми командами;
- IPublishObserver — для наблюдения за отправляемыми событиями.
Для каждого из них интерфейс IBusControl предоставляет собственный метод подключения, выполнение которого должно быть осуществлено непосредственно перед IBusControl.Start().
В качестве примера далее представлена реализация ConsumeObserver:
public class ConsumeObserver : IConsumeObserver
{
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}");
return Task.CompletedTask;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}");
return Task.CompletedTask;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}");
return Task.CompletedTask;
}
}
Я не буду приводить код каждого из консьюмеров, т.к. по принципу работы и структуре они схожи. Имплементацию каждого из них можно посмотреть в документации или в исходниках на Github.
Итоговый pipeline получения команды на отправку смс сообщения, её обработки и публикации события о её успешном выполнении выглядит следующим образом:
Новое в MassTransit 3.0
С изменениями, которые коснулись новой версии библиотеки, вы можете ознакомиться в 2-х обзорных статьях автора библиотеки Chris Patterson’а на страницах его блога: MassTransit 3 API Changes и MassTransit v3 Update.
Заключение
Здесь должно было быть сравнение наиболее популярных библиотек для работы с очередями сообщений, однако, я решил оставить это для отдельной статьи.
Надеюсь, мне удалось провести для вас поверхностное знакомство с библиотекой MassTransit, за гранью которого ещё остаются такие интересные вещи, как транзакционность, персистентность (интеграция с NHibernate, MondoDb, EntityFramework), планировщик отправки сообщений (интеграция с Quartz), state machine (Automatonymous и Saga), логирование (Log4Net, NLog), многопоточность и многое другое.
Исходные коды примеров доступны на Github.
Используемые материалы: