In my previous post (“Singleton” Azure Functions) I offered a tip on how to set up an Azure function to serve as a system-wide “singleton”. One of the use cases I was looking for was to set up a service to act as a “broker between two Service Bus queues“. I will not go into details, suffice to say, there is a requirement to only allow, let’s say, 100 messages per second to pass through the queue.
As I am a strong proponent of “less talk, more code” so here is a rough version that gets the job done:
private static readonly SemaphoreSlim _semaphore = new new SemaphoreSlim(1, 1);
private static readonly QueueClient _outputQueueClient = new QueueClient("{connection string}");
private static double _nextDispatchAt;
private const double _rate = 100; // the rate!
/* Method creates a task scheduled to run approximately
at a given "time" */
private static Task MoveMessageToOutputQueue(Message message)
{
Contract.Assert(message != null);
/* Get a lock to make sure _nextDispatchAt is modified
atomically. This is a lazy implementation but it does the job. */
_semaphore.Wait();
try
{
/* Calculate the next "click". */
var now = DateTime.UtcNow.Ticks;
var delta = _nextDispatchAt - now;
var click = TimeSpan.TicksPerMillisecond * (1000 / _rate);
if (delta > 0)
{
/* We need to wait until the next dispatch. */
var millisToWait = (_nextDispatchAt - now) / TimeSpan.TicksPerMillisecond;
_nextDispatchAt += click;
/* If the number of ticks is below the millisecond
threshold, execute directly. */
return millisToWait > 0 ?
Task.Delay((int)millisToWait).ContinueWith(t => _outputQueueClient.SendAsync(message)) :
_outputQueueClient.SendAsync(message);
}
else
{
/* The execution is direct as there is no waiting time. */
_nextDispatchAt = now + click;
return _outputQueueClient.SendAsync(message);
}
}
finally
{
_semaphore.Release();
}
}
/* This is the actual Azure function which is set up to listen on the "input-queue" */
[FunctionName(nameof(MoveMessageFromInputToOutput))]
public static async Task MoveMessageFromInputToOutput(
[ServiceBusTrigger("input-queue")] Microsoft.ServiceBus.Messaging.BrokeredMessage message,
TraceWriter log)
{
Contract.Assert(message != null);
Contract.Assert(log != null);
log.Debug($"Moving incoming message with label \"{message.Label}\" ...");
/* Make a copy of the message (just the body and whatever fields are interesting). */
Message outputMessage;
using (var ms = new MemoryStream())
{
await message.GetBody<Stream>().CopyToAsync(ms).ConfigureAwait(false);
outputMessage = new Message(ms.ToArray())
{
Label = message.Label,
ContentType = message.ContentType
};
}
await MoveMessageToOutputQueue(outputMessage).ConfigureAwait(false);
}
To make sure the higher rates are handled, and that the function does not starve, one can update the host.json as follows:
{
"serviceBus": {
"maxConcurrentCalls": 32, // Threads will mostly be awaiting
"prefetchCount": 256 // The more the merrier
}
}
After some testing it turns out that the rate limiter is almost perfect on “low rates” such as 100 messages/second. Due limitations of the Service Bus and other factors this code will not surpass ~600 messages/second. In such cases, multiple instances of the rate limiter should be deployed, and dividing the load between them.