In my previous post (“Singleton” Azure Functions) I offered a tip on how to set up an Azure function to serve as a system-wide “singleton”. One of the use cases I was looking for was to set up a service to act as a “broker between two Service Bus queues“. I will not go into details, suffice to say, there is a requirement to only allow, let’s say, 100 messages per second to pass through the queue.
As I am a strong proponent of “less talk, more code” so here is a rough version that gets the job done:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | private static readonly SemaphoreSlim _semaphore = new new SemaphoreSlim(1, 1); private static readonly QueueClient _outputQueueClient = new QueueClient( "{connection string}" ); private static double _nextDispatchAt; private const double _rate = 100; // the rate! /* Method creates a task scheduled to run approximately at a given "time" */ private static Task MoveMessageToOutputQueue(Message message) { Contract.Assert(message != null ); /* Get a lock to make sure _nextDispatchAt is modified atomically. This is a lazy implementation but it does the job. */ _semaphore.Wait(); try { /* Calculate the next "click". */ var now = DateTime.UtcNow.Ticks; var delta = _nextDispatchAt - now; var click = TimeSpan.TicksPerMillisecond * (1000 / _rate); if (delta > 0) { /* We need to wait until the next dispatch. */ var millisToWait = (_nextDispatchAt - now) / TimeSpan.TicksPerMillisecond; _nextDispatchAt += click; /* If the number of ticks is below the millisecond threshold, execute directly. */ return millisToWait > 0 ? Task.Delay(( int )millisToWait).ContinueWith(t => _outputQueueClient.SendAsync(message)) : _outputQueueClient.SendAsync(message); } else { /* The execution is direct as there is no waiting time. */ _nextDispatchAt = now + click; return _outputQueueClient.SendAsync(message); } } finally { _semaphore.Release(); } } /* This is the actual Azure function which is set up to listen on the "input-queue" */ [FunctionName(nameof(MoveMessageFromInputToOutput))] public static async Task MoveMessageFromInputToOutput( [ServiceBusTrigger( "input-queue" )] Microsoft.ServiceBus.Messaging.BrokeredMessage message, TraceWriter log) { Contract.Assert(message != null ); Contract.Assert(log != null ); log.Debug($ "Moving incoming message with label \"{message.Label}\" ..." ); /* Make a copy of the message (just the body and whatever fields are interesting). */ Message outputMessage; using ( var ms = new MemoryStream()) { await message.GetBody<Stream>().CopyToAsync(ms).ConfigureAwait( false ); outputMessage = new Message(ms.ToArray()) { Label = message.Label, ContentType = message.ContentType }; } await MoveMessageToOutputQueue(outputMessage).ConfigureAwait( false ); } |
To make sure the higher rates are handled, and that the function does not starve, one can update the host.json as follows:
1 2 3 4 5 6 | { "serviceBus" : { "maxConcurrentCalls" : 32, // Threads will mostly be awaiting "prefetchCount" : 256 // The more the merrier } } |
After some testing it turns out that the rate limiter is almost perfect on “low rates” such as 100 messages/second. Due limitations of the Service Bus and other factors this code will not surpass ~600 messages/second. In such cases, multiple instances of the rate limiter should be deployed, and dividing the load between them.