DAPR publish-subscribe over gRPC

Dan Kuida
Hitchhikers guide to the (software) galaxy
6 min readJan 30, 2024

--

Intro

I am currently the architect for a system that is decomposed using the IDesign Method. This system is composed of subsystems. The systems communicate with each other using messages. Specifically, each system can publish messages and subscribe to messages.

The system utilizes DAPR. I show how to use DAPR’s publish-subscribe component using gRPC. gRPC is a remote procedure call protocol developed by Google.

Outline

  • Why DAPR
  • Why publish-subscribe, pub-sub
  • Why gRPC
  • Description of the challenge
  • A solution to the challenge
  • Afterword

Why DAPR

Software is built using granular blocks; I use the term “component” interchangeably. Throughout the years, the technology of these components has changed, but one of the main goals is the same: reusability. The most recent component is a service. A service can use network protocols to communicate or invoke methods local to the same computer. The industry adopted the term (micro)service to signify a scoped responsibility for a particular component.

DAPR is the current pinnacle of the described trend. It is straightforward to embed and utilize. By doing so:

  • Developers reap the fruits of infrastructure (iFX) implemented by other developers specializing in that component.
  • DAPR hides many aspects of distributed computing to provide a cohesive development approach without the need to be a distributed systems guru.
  • Builds on top of containerization. Containers did become a de facto standard in the software toolchain.

Once a team puts the non-steep effort into DAPR concepts, they can focus on the primary goal of any software shop—providing value to the business. Software teams must understand the business domain and focus on that rather than spending most of their time developing infrastructure and pipelining it into their process.

Standardizing infrastructure has another benefit: onboarding or moving team members stops being a technology problem. Now, it is a business-domain training process.

Why publish-subscribe

Publish-subscribe, in short, is abbreviated as pub-sub. Pub-sub can be local to the same computer. It can be across computers. It can be local to the same project, language, repository, or team. Pub-sub can be across any of those previously mentioned. Pub-sub is a pattern that can be an in-code-based implementation or an external solution.

Pub-sub allows the decoupling of the caller and callee. A truly asynchronous and responsive code must not wait for the duration of the entire chain of execution. In the pub-sub approach, the caller publishes a message with all the required data (or references to it). The subscriber executes the chain and might signal back with another message. When (not if) execution goes wrong, the pattern allows for retries.

A widespread use of pub-sub is the push notifications that every mobile user receives many times daily.

Being an orchestration pattern, it is important to impose limits on what types of components are allowed to publish and subscribe. You can read more about that on Righting software page 79, Design “Don’ts”.

Why gRPC

Throughout the years, another process in the software industry has been scaling out. When a component or a group of components reach a limit of utilization of computer resources, the typical approach is to utilize multiple computers. In many instances, the resource limit is memory and CPU. Previously I did mention (micro)services, these to some extent synonimous with web-services. In some software shops, http/s become the norm as the communication protocol — the trasnport — between these components.

The message bus is a core principle in scalability. Publish-Subscribe is essentially a specific use-case or pattern supported by the broader concept of a message bus.

2023 is way past the age to invent your own infrastructure, and DAPR is mature enough to be a building block in a modern system. We took the DAPR component for publish-subscribe, but there is a caveat. In the examples, there was no example or mention how to implement sidecar callbacks via a gRPC. After reading code from fellow IDesign alumni — and a lot of source code reading for .net SDK and typescript SDK (typescript will be another post), we have a coherent publish-subscribe flow infrastructure that can be easily:

  • reused.
  • configured as an option and extension with a few lines.
  • auto discoverable.

Description of the challenge

There is no built in support for pub-sub in .net DAPR SDK, when using gRPC only.

Distributed system is invocation of commands via messages (DTO, models, classes). Using the same transport protocol for client facing and internal communication is unwise. Paraphrase on Juval Löwy “You do not communicate with your liver via verbal commands.”

No amount of magic will make a DAPR HTTP SDK work the same as gRPC SDK.

There is a good reason for that. While HTTP has a route path, gRPC does not. There is a brilliant implementation to support that in the JavaScript SDK of DAPR, it is not available in .net.

A solution to the challenge

I repeat— DAPR TypeScript SDK implementation is much richer.

Source code documentation explains that the sidecar will invoke specific callbacks events into the service. Once you configure the communication with the sidecar over gRPC, this is how it works.

One solution was to use MassTransit (great tool by itself). The caveat is — with type resolution — polymorphism. MassTransit invokes all supertypes for a subtype. Not good.

We wanted to have a strategy pattern by the book, with cloudevents providing the context for the data (as intended). A bonus point would be — autoconfigure for the subscribers by assembly scan.

Message callback based on type

  1. We switched to use MediatR, which does figure out type resolution.
  2. A signature for the consumer is IRequestHandler<T>.

Here is how pubsub iFX (infrastructure) registration looks like

public static class RegistrationExtensions
{
/// ...

public static IServiceCollection AddPubSub(this IServiceCollection services, Type[] handlers)
{
services.AddMediatR(registration =>
{
Assembly[] assemblies = handlers.Select(t => t.Assembly).Distinct().ToArray();
registration.RegisterServicesFromAssemblies(assemblies);
foreach (Type handler in handlers)
{
RegisterTypes(handler);
}
});
return services;
}


static void RegisterTypes(Type handler)
{
Type requestTypeHandler = typeof(IRequestHandler<>).GetGenericTypeDefinition();

List<Assembly> typesList = handler
.FindInterfaces((type, criteria) =>
{
bool isAssignableTo = type.GetGenericTypeDefinition().IsAssignableTo(criteria as Type);
return isAssignableTo;
}, requestTypeHandler)
.Select(i =>
{
Type? genericArguments = i.GetGenericArguments().FirstOrDefault();
return genericArguments;
}).Where(t => t != null).Cast<Type>()
.Select(t => t.Assembly)
.Distinct()
.ToList();

TypeCache.RegisterTypes(typesList);
}
}

3. Subscriptions. Code subscription versus component subscription. We could extract the types of the T from IRequestHandler<T> and generate a topic subscriptions.

 private static void AddRules(List<Type> handlers)
{
Type[] requestTypes = handlers.SelectMany(handler =>
handler
.FindInterfaces((type, criteria) =>
{
bool isAssignableTo = type.GetGenericTypeDefinition().IsAssignableTo(criteria as Type);
return isAssignableTo;
}, typeof(IRequestHandler<>).GetGenericTypeDefinition())
// .GetInterfaces()
// .Where(i=>i.FindInterfaces())
.Select(i =>
{
Type[] genericArguments = i.GetGenericArguments();
return genericArguments.FirstOrDefault();
}).Where(t => t != null).Cast<Type>())
.ToArray();

// new TopicSubscription()
}

BUT

  • The dead letter topic would have to be configured externally
  • That would split configuration between code and require component configuration — I prefer to have a single configuration place.

4. Publishing. After resolving cloudevent, we can resolve the Data property of cloudevent — to the appropriate type and route it to a subscriber/handler.

public override async Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
{
m_Logger.LogInformation($"Received on topic event type: {request.Type}");
TopicEventResponse topicEventResponse = new();
try
{
Type dataType = TypeCache.GetType(request.Type);
object data = JsonSerializer.Deserialize(request.Data.Span, dataType, m_DaprClient.JsonSerializerOptions)!;

await m_Mediator.Send(data);
}
catch (TypeLoadException e)
{
m_Logger.LogError(e, $"Error while processing type: {request.Type}");
topicEventResponse.Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop;
}
catch (Exception e)
{
m_Logger.LogError(e, $"Error while processing request, message ID: {request.Id}");
topicEventResponse.Status = TopicEventResponse.Types.TopicEventResponseStatus.Retry;
}

topicEventResponse.Status = TopicEventResponse.Types.TopicEventResponseStatus.Success;

return topicEventResponse;
}

Example of a subscriber that leverages infrastructure.

class InitiateMessages {
}

class ExpandedInitiateMessages: InitiateMessages{
}


public class MessageConsumer :
IRequestHandler<InitiateMessages>,
IRequestHandler<ExpandedInitiateMessages>
{

public MessageConsumer(IServiceProvider serviceProvider)
{
}

public async Task Handle(InitiateMessages request, CancellationToken cancellationToken)
{
// work with your message here
}
public async Task Handle(ExpandedInitiateMessages request, CancellationToken cancellationToken)
{
// work with your message here, each class is called for itself and not for the derived classes
}
}

Summary

  • Behind the API, we have a very different interaction pattern as opposed to the one in front of it.
  • Striving for a gRPC invocation with an easy abstraction over the transports (HTTP2, IPC, docker-compose private network)
  • Don’t settle for the provided examples. Read the source code and figure out how to make it suitable for your needs.
  • The only easy day — was yesterday.

Acknowledgments

--

--