In my previous post (“Singleton” Azure Functions) I offered a tip on how to set up an Azure function to serve as a system-wide “singleton”. One of the use cases I was looking for was to set up a service to act as a “broker between two Service Bus queues“. I will not go into details, suffice to say, there is a requirement to only allow, let’s say, 100 messages per second to pass through the queue.
As I am a strong proponent of “less talk, more code” so here is a rough version that gets the job done:
private static readonly SemaphoreSlim _semaphore = new new SemaphoreSlim(1, 1); private static readonly QueueClient _outputQueueClient = new QueueClient("{connection string}"); private static double _nextDispatchAt; private const double _rate = 100; // the rate! /* Method creates a task scheduled to run approximately at a given "time" */ private static Task MoveMessageToOutputQueue(Message message) { Contract.Assert(message != null); /* Get a lock to make sure _nextDispatchAt is modified atomically. This is a lazy implementation but it does the job. */ _semaphore.Wait(); try { /* Calculate the next "click". */ var now = DateTime.UtcNow.Ticks; var delta = _nextDispatchAt - now; var click = TimeSpan.TicksPerMillisecond * (1000 / _rate); if (delta > 0) { /* We need to wait until the next dispatch. */ var millisToWait = (_nextDispatchAt - now) / TimeSpan.TicksPerMillisecond; _nextDispatchAt += click; /* If the number of ticks is below the millisecond threshold, execute directly. */ return millisToWait > 0 ? Task.Delay((int)millisToWait).ContinueWith(t => _outputQueueClient.SendAsync(message)) : _outputQueueClient.SendAsync(message); } else { /* The execution is direct as there is no waiting time. */ _nextDispatchAt = now + click; return _outputQueueClient.SendAsync(message); } } finally { _semaphore.Release(); } } /* This is the actual Azure function which is set up to listen on the "input-queue" */ [FunctionName(nameof(MoveMessageFromInputToOutput))] public static async Task MoveMessageFromInputToOutput( [ServiceBusTrigger("input-queue")] Microsoft.ServiceBus.Messaging.BrokeredMessage message, TraceWriter log) { Contract.Assert(message != null); Contract.Assert(log != null); log.Debug($"Moving incoming message with label \"{message.Label}\" ..."); /* Make a copy of the message (just the body and whatever fields are interesting). */ Message outputMessage; using (var ms = new MemoryStream()) { await message.GetBody<Stream>().CopyToAsync(ms).ConfigureAwait(false); outputMessage = new Message(ms.ToArray()) { Label = message.Label, ContentType = message.ContentType }; } await MoveMessageToOutputQueue(outputMessage).ConfigureAwait(false); }
To make sure the higher rates are handled, and that the function does not starve, one can update the host.json as follows:
{ "serviceBus": { "maxConcurrentCalls": 32, // Threads will mostly be awaiting "prefetchCount": 256 // The more the merrier } }
After some testing it turns out that the rate limiter is almost perfect on “low rates” such as 100 messages/second. Due limitations of the Service Bus and other factors this code will not surpass ~600 messages/second. In such cases, multiple instances of the rate limiter should be deployed, and dividing the load between them.