BlockingCollection and IProducerConsumerCollection

In .NET 4.0 there is a new namespace on the block, and it is called System.Collections.Concurrent. In this namespace you will find a pretty decent number of goodies that will help you to more easily write application which can leverage multiple threads without having to resort to manual locking. We looked previously at the ConcurrentBag, ConcurrentStack, and ConcurrentQueue. Well, I have two more goodies to show off, and those are the BlockingCollection class and the IProducerConsumerCollection interface.

In order to get this party started, let me explain exactly what the IProducerConsumerCollection is and why we would want to use it. The IProducerConsumerCollection interface is pretty simply and really exposes only two methods that we care about: TryAdd and TryTake. You see, IProducerConsumerCollection is designed for multi-threaded producer/consumer scenarios, which means situations where we have multiple threads producing pieces of data (producers), and then multiple threads that are trying to consume those pieces of data (consumers). So, IProducerConsumer just signifies that the implementing type supports thread-safe adding and removing of data. (In .NET 4.0, the types that will support this interface are the ones that we already looked at: ConcurrentStack, ConcurrentQueue, and ConcurrentBag)

And this is where BlockingCollection comes in. The BlockingCollection type is very interesting in that it is just a wrapper over an IProducerConsumerCollection. Meaning that it doesn’t actually implement an underlying storage mechanism, it needs an instance of an IProducerConsumerCollection in order to store data. What it provides is a mechanism for the specific scenario where you have one (or more) threads that are producing values (producers) and then have one more more threads which are consuming values from the collection (consumers). This means that we can use any type which supports IProducerConsumerCollection and if we don’t give the BlockingCollection an implementation, it will use ConcurrentQueue (FIFO) by default.

In order to show off the main use case for the BlockingCollection class, let’s look at a multi-threaded scenario in which our normal collection classes will blow up spectacularly. Let’s say that we have a queue and we want to put work into that queue, and have other threads consuming the items that are put into the queue in order to perform some operation on them.

(We will use Tasks from the System.Threading.Tasks namespace in order to fire off async operations. Go read my post on Tasks if you have never used them.):

var queue = new Queue<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        queue.Enqueue("value");
    }
});

In this case, the producer is just adding the same value over and over into the queue. We will then create some consumers which will use this collection and try to loop over it’s values:

Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (queue.Count > 0)
        {
            string value = queue.Dequeue();
            Console.WriteLine("Worker 1: " + value);
        }
    }
    
});

Task.Factory.StartNew(() =>
{
    while (true)
    {                    
        if (queue.Count > 0)
        {
            string value = queue.Dequeue();
            Console.WriteLine("Worker 2: " + value);
        }
    }
    
});

Now, I hope this is clear that this is NOT GOING TO WORK. It might appear to work for a bit, and if the load is low, it might run for a while. But if we let this run at a high load for more than a few minutes we will get errors because one of the threads will check if the queue is empty and then try to pull an item out, and get an exception because the other thread already removed the item. A classic race condition. In fact, when I ran it, it even started popping a bunch of null values off the queue, which I think is because the internal Count of items dropped below zero and the Dequeue method only checks to see if Count equals zero, not less than zero.

So while this code doesn’t work properly, this scenario is a very popular one. Having worker threads pull their own work is a great way of distributing it. If you want more parallelism, then you just spin up more consumers who will then start pulling work.

So, what would we have to do in order to support this scenario? One way to do this would be to use the lock keyword to serialize all calls to the queue. This might be an okay solution if each work item is heavy, but it is going to destroy performance if we have work items that are very light or we have a huge number of consumers. Well, in .NET 4.0 we have several lock free data structures available to us. In fact, as you saw in one of my previous posts, one of them is a concurrent queue. If we used the ConcurrentQueue, we could write the code like this:

var queue = new ConcurrentQueue<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        queue.Enqueue("value" + count);
        count++;                    
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 1: " + value);
        }
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 2: " + value);
        }
        
    }
});

Which isn’t bad, but wouldn’t it be nice if we could just try and get an item and have the queue block if no items are available? Sure it would be! And this is why we have BlockingCollection. It implements this exact behavior, and a little extra. BlockingCollection takes an IProducerConsumerCollection in its constructor, or it will use a ConcurrentQueue by default if you call its empty constructor. Then all you have to do is call "Add" or "Take" on the BlockingCollection and it will block if there is nothing left in the queue. So the above code would look like this:

var blockingCollection = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        blockingCollection.Add("value" + count);
        count++;                    
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {                    
        Console.WriteLine("Worker 1: " + blockingCollection.Take());
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Console.WriteLine("Worker 2: " + blockingCollection.Take());
    }
});

Well that is a tiny bit better, but we still have those random while statements. Wouldn’t it be nice if we didn’t need those? Yes, again, it would be! Don’t you wish I’d stop asking rhetorical questions? Ha, of course you do. :-) Anyways, BlockingCollection implements this behavior for us as well with a method called "GetConsumingEnumerable". What we do is simply call this method and then iterate over the resulting IEnumerable and it blocks until it finds work items! Now that is nice! So one of the workers from above would look like this:

Task.Factory.StartNew(() =>
{
    foreach (string value in blockingCollection.GetConsumingEnumerable())
    {
        Console.WriteLine("Worker 1: " + value);
    }                
});

This code will sit there forever, iterating over the blocking collection, and blocking whenever it runs out of items. As soon as new items start appearing in the collection, it will start enumerating them again! Very easy!

Summary

And there you have it. BlockingCollection is an incredibly easy way to have a few threads producing data and have numerous other threads picking up and processing that same data. You can switch out its underlying storage mechanism in order to affect the behavior regarding where items go when you add them, and where items come from when you take them, all with it being completely abstracted away from the underlying data store. I hope that you find some great uses for the BlockingCollection class, and I hope you enjoyed this post!

Be Sociable, Share!

10 comments

  1. You should mention CompleteAdding(), which signifies that no more items will be added. If GetConsumingEnumerable is used, the foreach will end gracefully. If TryTake is used, it will through an InvalidOperationException. For the foreach case, it’s quite useful.

    Task generator = Task.Factory.StartNew(() => {
    Random random = new Random();
    for (int i = 1; i < 10; i++) {
    bc.Add(i);
    Thread.Sleep(300);
    }
    bc.CompleteAdding();
    });

    Task consumer1 = Task.Factory.StartNew(() => {
    foreach (int number in bc.GetConsumingEnumerable())
    Console.WriteLine("Consumer 1: {0}", number);
    Console.WriteLine("Consumer 1 complete");
    });
    Task consumer2 = Task.Factory.StartNew(() => {
    foreach (int number in bc.GetConsumingEnumerable())
    Console.WriteLine("Consumer 2: {0}", number);
    Console.WriteLine("Consumer 2 complete");
    });

    Task.WaitAll(generator, consumer1, consumer2);

    Console.ReadKey(true);

  2. Isn’t there something like Java’s notify to handle this in a cleaner way than busy waiting?

  3. Justin Etheredge

    I’m not familiar with Java’s notify system, so I’m not really sure how to answer that question. Sorry!

  4. Sanjeev – The BlockingQueue in Java uses the notify system under the covers. I assume in .NET BlockingCollection is implemented by using Monitor.Wait and Monitor.Pulse.

    Basically .NET 4.0 ripped off what’s been available in Java since 1.5!

  5. Again, Outstanding example…it is so simple and clear. Thank You!

    Now to benchmark the performance against the standard queue and blocking threads (contention).

  6. Thank you, I had struggled understanding how to use this. After this post it is absolutely clear!

    Thank you Justin!!!

  7. This is a great explanation! Thanks!

  8. @Clive

    “Basically .NET 4.0 ripped off what’s been available in Java since 1.5!”

    What did you expect MS would do? Invent something probably less intuitive just to be different? This btw applies to .net as a whole, which was pretty much modeled after Java.

    So what? Copying something that works is better than inventing crap. Look eg at Windows, especially pre-NT, for example. What an utter piece of crap it was. And that despite a variety of Unix OSes existing for decades that could have served as models.

  9. @Horst

    A common principal is “don’t reinvent the wheel by your self”.

    MS is trying to make the “wheel” out-of-box; still it doesn’t restrict people inventing their own common library at all.

  10. I tried using the blockingcollection in a super simple scenario with one producer and one consumer thread. I was puzzled at the poor performance and found that it was one of the BlockingCollection methods causing it, forget which one.

    I switched to ConqurrentQueue with some additional logic I added to batch up things and that improved perf I think it was over 10x.

    Given that, if you are using these things in tight loop hoping to spread the load to several cores, if the work item is small, BlockingCollection overhead may be churning CPU a great deal. I didn’t really study using CQ without batching, but I suspect the overhead is due to the frequent blocking that happened with BC.

Leave a comment