Parallel Collection Operations

In one of my previous posts we defined a Map function that allows you to apply an operation to a collection and get back a copy with the operation applied. This map function looked like this:

  public static IEnumerable<TResult> Map<TArg, TResult>(
      this IEnumerable<TArg> list,
      Func<TArg, TResult> func)
  {
    List<TResult> result = new List<TResult>();
    foreach (TArg item in list)
    {
      result.Add(func(item));
    }
    return result;
  }

As you can see we create a new list and then loop through our original list calling the passed in function and adding the result to the new list. This list is returned and all is well…right? Right. But what happens if our function takes a while to run? I came across something similar to this when I was thinking about my Linq To SimpleDB provider. I wanted to load up a bunch of the attributes asynchronously, but I wanted a super easy way to do it. I wanted an extension method that I could just call like this:

  IEnumerable<string> result2 = values.ParallelMap(
    n => {
      return LongRunningMethod(n); 
    }
  );

This would then asynchronously run my fancy little lambda here for each item in the collection. So, I figured that I would loop through the collection just like I did before, only this time I would create a closure inside of the "foreach" loop and have it bind against the loop’s "item" variable. I could then call my lambda and pass the value in and add its result to to the resulting collection. Simple, right? That was somewhat sarcastic :-) So here was my first stab at a ParallelMap method:

  public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
          this IEnumerable<TArg> list,
          Func<TArg, TResult> func)
  {
    List<TResult> result = new List<TResult>();
    using (var resetEvent = new ManualResetEvent(false))
    {
      int count = 0;
      foreach (TArg item in list)
      {
        WaitCallback run =
          n =>
          {
            TResult value = func(item);
            lock (result)
            {
              result.Add(value);
            }
            if (Interlocked.Decrement(ref count) == 0)
            {
              resetEvent.Set();
            }
          };
        Interlocked.Increment(ref count);
        ThreadPool.QueueUserWorkItem(run);
      }
      resetEvent.WaitOne();
    }
    return result;
  }

Now, this may look pretty good, but there is one little problem… it doesn’t really work. And why praytell does it not work? Well, I’m glad you asked. Check out this little bit of reverse engineered goodness from Reflector:

ReflectorForEach

This is one of the compiler generated classes that the C# compiler creates in order to deal with closures. Essentially it creates this class so that the closure (in this case our ParallelMap function that you see below) can be bound to the surrounding local variables. You can see the local variables and then you can see the "TArg item" that this class has. Well, this class is created because we have our closure inside of our loop and so the compiler creates this to bind the closure to the loop variable. But we don’t really want this. The reason is that when the loop variable changes, so does this variable. So if our async functions start to back up in the ThreadPool you will get closures whose "item" variable has changed before it gets a chance to run. This will cause us to get some funky results back. The fix for this is to move the closure out of the loop so that it is not bound to the loop’s variables. All we need to do is redefine it so that we can just pass a value into the loop.

Another problem that I noticed was that sometimes I was getting counts in the result collection that were shorter than my original collection and I figured out that my count was being decremented to zero prematurely and would allow us to exit before all of the threads had run. This was easily fixed by setting the count to the number of items in my list and then only decrementing from this. Here are these two fixes in a new method:

public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
  this IEnumerable<TArg> list,
  Func<TArg, TResult> func)
  {
    List<TResult> result = new List<TResult>();
    using (var resetEvent = new ManualResetEvent(false))
    {
      int count = list.Count();
 
      WaitCallback run =
          n =>
          {
            TResult value = func((TArg)n);
            lock (result)
            {
              result.Add(value);
            }
            if (Interlocked.Decrement(ref count) == 0)
            {
              resetEvent.Set();
            }
          };
 
      foreach (TArg item in list)
      {
        ThreadPool.QueueUserWorkItem(run, item);
      }
      resetEvent.WaitOne();
    }
    return result;
  }

In this method the closure is bound before we ever get to the loop, so the reflected class you saw above is never even generated. But now we have to pass in our item from the loop as a parameter to our closure. But now when you run it, it works like a charm. There is one caveat though, if ordering is important to you then you don’t want to use this sort of method. The issue is that if you are running all of these delegates asynchronously then they may finish out of order and therefore put items into the result list in a different order than your original collection.

So, what kind of performance advantage can you see on something like this? Well, it all depends on how long-running your calls are and how many you have to make. If you have calls that take 30 seconds each, then you will see a huge performance increase because you’ll be able to make several calls at once and then get results faster. This will work especially well if you have idle time in your algorithm like a call to a webservice or out to a database.

Lets look at a simple test. In order to add a tiny amount of lag I have created a method that looks like this:

  public static string LongRunningMethod(string s)
  {
    Thread.Sleep(1);
    return s.ToUpper();
  }

It will pause for 1 millisecond, but that is enough to create an artificial lag for us to see the difference in execution speeds. We will then call this method with the "Map" and "ParallelMap" functions like this:

  IEnumerable<string> result = values.Map(
    n => {
      return LongRunningMethod(n);
    }
  );
 
  IEnumerable<string> result2 = values.ParallelMap(
    n => {
      return LongRunningMethod(n); 
    }
  );

I have wrapped these method calls in a "Stopwatch" class and I will time their execution. The "values" collection has 49,962 entries in it that we are going to loop through. Here is the resulting output of our test:

MapVParallelMapOutput

Yep, 98 seconds versus about 10. If you had a longer running operation then the difference could potentially be even more pronounced. This test was also run on a dual core CPU so that is helping boost it a bit as well. But as Will Ferrell would say, "the results look glooooooooooooorious". Also, keep in mind that you don’t have to return the same type that you are using in your original collection, the ParallelMap method is creating a new result collection, not modifying your original, so you can do something like this as well:

  IEnumerable<int> result2 = values.ParallelMap(
    n => {
      return n.Length; 
    }
  );

I’m not sure why you would want to do this (considering you currently can’t guarantee the ordering of the items in the list, but I have a solution to that problem that I am going to blog about soon) but you could do this and maybe you’ll find an interesting use for this. You may also be tempted to look at the above and try to increment a counter to do something like add up the length of all the items, like this:

  int wordCount = 0;
  IEnumerable<int> result2 = values.ParallelMap(
    n =>
    {
      return wordCount += n.Length;
    }
  );

But you shouldn’t do that. You have to consider concurrent access, which means that you’ll need to put in locking, which means you just punched the whole purpose of multithreading right in the face. Bam! Your method will end up looking like this: (and it’ll be far slower)

  Object obj = new Object();
  int wordCount = 0;
  IEnumerable<int> result2 = values.ParallelMap(
    n =>
    {
      lock (obj)
      {
        return wordCount += n.Length;
      }
    }
  );

Once you start modifying shared state, you are in trouble. And you wonder why functional languages are so big on immutability? So, this is your ParallelMap function kids, have fun and be careful. You might speed up some of your code, but you could put your eye out. Also, if you see any issues with this code, or you have some other method you would like to see a parallel version of, please leave a comment. Now I am going to go to sleep and hopefully not dream of being attacked by lambdas.

You can download the code for the extension methods here: Parallelicious.zip

Be Sociable, Share!

3 comments

  1. great post and good luck with your LINQ provider for SimpleDB

  2. Nice post!

    You should give a look to the new Parallel Framework for .NET, it’s just about this.

    Altough I knew the existence of ParallelFX, I enjoyed reading your post: seeing a parallel for running at this level of detail is very interesting :)

    Keep up the good work!

  3. Thanks! My latest post is actually about ParallelFX, you should check it out.

Leave a comment