Rate limiting an Azure queue

In my previous post (“Singleton” Azure Functions) I offered a tip on how to set up an Azure function to serve as a system-wide “singleton”. One of the use cases I was looking for was to set up a service to act as a “broker between two Service Bus queues“. I will not go into details, suffice to say, there is a requirement to only allow, let’s say, 100 messages per second to pass through the queue.

As I am a strong proponent of “less talk, more code” so here is a rough version that gets the job done:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private static readonly SemaphoreSlim _semaphore = new new SemaphoreSlim(1, 1);
private static readonly QueueClient _outputQueueClient = new QueueClient("{connection string}");
private static double _nextDispatchAt;
private const double _rate = 100; // the rate!
 
/* Method creates a task scheduled to run approximately
   at a given "time" */
private static Task MoveMessageToOutputQueue(Message message)
{
    Contract.Assert(message != null);
 
    /* Get a lock to make sure _nextDispatchAt is modified
       atomically. This is a lazy implementation but it does the job. */
    _semaphore.Wait();
    try
    {
        /* Calculate the next "click". */
        var now = DateTime.UtcNow.Ticks;
        var delta = _nextDispatchAt - now;
        var click = TimeSpan.TicksPerMillisecond * (1000 / _rate);
        if (delta > 0)
        {
            /* We need to wait until the next dispatch. */
            var millisToWait = (_nextDispatchAt - now) / TimeSpan.TicksPerMillisecond;
            _nextDispatchAt += click;
 
            /* If the number of ticks is below the millisecond
               threshold, execute directly. */
            return millisToWait > 0 ?
                Task.Delay((int)millisToWait).ContinueWith(t => _outputQueueClient.SendAsync(message)) :
                _outputQueueClient.SendAsync(message);
        }
        else
        {
            /* The execution is direct as there is no waiting time. */
            _nextDispatchAt = now + click;
            return _outputQueueClient.SendAsync(message);
        }
    }
    finally
    {
        _semaphore.Release();
    }
}
 
/* This is the actual Azure function which is set up to listen on the "input-queue" */
[FunctionName(nameof(MoveMessageFromInputToOutput))]
public static async Task MoveMessageFromInputToOutput(
[ServiceBusTrigger("input-queue")] Microsoft.ServiceBus.Messaging.BrokeredMessage message,
TraceWriter log)
{
    Contract.Assert(message != null);
    Contract.Assert(log != null);
    log.Debug($"Moving incoming message with label \"{message.Label}\" ...");
 
    /* Make a copy of the message (just the body and whatever fields are interesting). */
    Message outputMessage;
    using (var ms = new MemoryStream())
    {
        await message.GetBody<Stream>().CopyToAsync(ms).ConfigureAwait(false);
        outputMessage = new Message(ms.ToArray())
        {
            Label = message.Label,
            ContentType = message.ContentType
        };
    }
 
    await MoveMessageToOutputQueue(outputMessage).ConfigureAwait(false);
}

To make sure the higher rates are handled, and that the function does not starve, one can update the host.json as follows:

1
2
3
4
5
6
{
    "serviceBus": {
        "maxConcurrentCalls": 32, // Threads will mostly be awaiting
        "prefetchCount": 256 // The more the merrier
    }
}

After some testing it turns out that the rate limiter is almost perfect on “low rates” such as 100 messages/second. Due limitations of the Service Bus and other factors this code will not surpass ~600 messages/second. In such cases, multiple instances of the rate limiter should be deployed, and dividing the load between them.