BlackWaspTM

This web site uses cookies. By using the site you accept the cookie policy.This message is for compliance with the UK ICO law.

Parallel and Asynchronous
.NET 4.0+

The ConcurrentBag<T> Collection

With the introduction of the parallel task library, which simplifies parallel programming, the .NET framework was extended with several thread-safe collections. One of the simpler such collections is ConcurrentBag<T>.

Thread-Safe Bag

A standard type of collection is the bag, otherwise known as the multiset. A bag is a very simple structure that holds a number of objects or values, possibly including duplicate items, with no particular order. This type of collection is useful for situations where the order in which items are added and removed is unimportant.

The ConcurrentBag<T> collection, in the System.Collections.Concurrent namespace, provides a multiset that is thread-safe. The collection allows you to add and remove items freely from multiple threads without having to worry about thread synchronisation. The fact that the bag has no ordering allows it to be particularly efficient when you have multiple threads or parallel tasks where each both adds and removes items.

Internally, the structure of the ConcurrentBag<T> is quite interesting. When a thread or task adds its first item to the bag, a new collection is created that is local to that thread. All items that a thread adds are added to its local collection. When a thread requests an item from the bag, the local collection acts as a stack, returning the top item, which is the latest item that was added.

The behaviour changes when a thread tries to extract an item from the bag when its local stack is empty. At this point, the thread uses appropriate locking of another thread's local collection and removes an item from the bottom of that list. As the thread that owns that collection will be removing items from the top of the list, the chances that either thread will be blocked is low. This gives the efficiency boost. However, this improvement only happens in situations where all threads both add and remove items from the multiset. If you were to have one thread writing to the collection and one removing items, the thread synchronisation may lead to a collection that is slower than some of the alternatives.

The diagram below illustrates where items will be added and removed for a ConcurrentBag<T> that is being accessed by four tasks in a parallel system. Tasks one, two and three have local collections that contain items. If they add an item it will be placed on the top of their stack. Removing an item extracts from the top of the local collection. Task four has an empty thread-local collection. If it adds an item it will appear at the top of the list. If it requests an item it will be extracted from the bottom of another thread's list.

ConcurrentBag Add, Remove and Steal Process

Example

Let's create a simple example of a ConcurrentBag<T>. Create a new console application and add the following using directives to the class that contains the Main method. The first simplifies access to the ConcurrentBag<T> class. The other two are used for multi-threading and parallel programming types.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

In our example we'll start four parallel threads of operation. Each will add a group of integers to the same bag, then extract items from the bag until it is empty. At the end of the process we'll output the task identifiers and the numbers they retrieved

To add an item to a bag you use the Add method, which accepts the new value in its parameter. Extracting an item from a bag is a similar process to that used for ConcurrentQueue<T> or ConcurrentStack<T>. You can't rely on a check that the collection is empty because there could be an item left in the bag that is removed by another thread after you check the collection but before you request a value. Instead, you use the TryTake method, using an output parameter. If an item is taken from the bag the method returns true and the output parameter holds the extracted value. If the bag is empty, the method returns false.

The code for the sample is shown below. Try running it to see the results.

static void Main(string[] args)
{
    ConcurrentBag<int> numbers = new ConcurrentBag<int>();

    Parallel.Invoke(
        () => FillAndEmptyBag(numbers, 1, 10),
        () => FillAndEmptyBag(numbers, 11, 20),
        () => FillAndEmptyBag(numbers, 21, 30),
        () => FillAndEmptyBag(numbers, 31, 50)
    );
}

private static void FillAndEmptyBag(ConcurrentBag<int> numbers, int start, int end)
{
    int value;
    var processed = new StringBuilder();

    for (int i = start; i <= end; i++)
    {
        numbers.Add(i);
    }

    while (numbers.TryTake(out value))
    {
        processed.Append(value).Append(" ");
        Thread.Sleep(1);
    }
            
    Console.WriteLine("Task {0}, Values Processed {1}", Task.CurrentId, processed);
}

/* OUTPUT

Task 2, Values Processed 10 9 8 7 6 5 4 3 2 1 31 34
Task 3, Values Processed 20 19 18 17 16 15 14 13 12 11 33 35
Task 4, Values Processed 50 49 48 47 46 45 44 43 42 41 40 39 38 37
Task 1, Values Processed 30 29 28 27 26 25 24 23 22 21 32 36

*/

The comment shows one possible output for the program. You can see that each of the four tasks starts by extracting the values that it put into the bag, in the reverse order to which they were added. Task 2, shown first, stored the values from one to ten and removed them all. Its local collection was then exhausted so it started to steal values from another thread's collection. These were taken from the bottom of the list, rather than the top, which is why the final values taken by task 2 were in ascending order. Tasks 1 and 3 were similar. However, task 4 added more values initially and only processed items from its own thread-local collection.

NB: It's important to note that all of the values were retrieved with no requirement for manual thread synchronisation.

Other Members

The example shows the two key methods of ConcurrentBag<T>. There are other methods and properties but they are of limited use. For example, the IsEmpty property returns a Boolean value that is true if the bag has no items. Count returns the number of values in the multiset. However, reacting to these values can lead to problems if other threads are adding or removing items at the same time.

Another method is TryPeek. This behaves in a similar manner to TryTake but leaves the extracted item in the bag. Again, this is of limited usefulness. You might peek at the next value and then decide to take it. However, you might not get the same item in the second operation if another thread has already taken it.

18 August 2013