codethinked (kōdthĭngked) adj. To be consumed by or obsessed with code.

Seeing the Future in ParallelFX

One of the neat features that ParallelFX has is tasks, they are simply wrappers that allow you to easily start parallel operations. To create a task all you have to do is this:

  Task task = Task.Create(n => LongRunningMethod(test));

The "n" parameter that you see is just the state that you can pass to the task. They provide an overload for you to pass this if you would want to, like this (oh and C# should have true optional parameters, I don't care what Anders thinks, overloads are a messy solution </rant>):

  Object state = new Object();
  Task task = Task.Create(n => LongRunningMethod(test), state);

Tasks, once created, start running immediately assuming that a thread is available for it to run on. (ParallelFX defaults the number of threads to the number of processors on your box) You can hook up an event that will allow the Task to notify your application when it has finished or you can also check its "IsCompleted" property to wait for it to finish.

  task.Completed += TaskCompleted;
 
  while (!task.IsCompleted)
  {
    Thread.Sleep(1000);
  }

So, what happens when you need to fire off a Task and then get a resulting value from the Task but you don't know when the Task will finish? Well, this is when you use a Future. A Future (which surprisingly implements the Future pattern) is simply a Task that is wrapped with a result value property that waits for the Task to be completed.

  Future<string> future = 
    Future.Create<string>(() => LongRunningMethod(test));
 
  string result = future.Value;

In this code we will fire off a separate thread running "LongRunningMethod" and then I will get the value for it. If the thread is not done when I request the return value then the call to Value will wait for the thread to finish before continuing.

While playing with this code I realized that the Future class provided no overload for passing state into a Future. So I promptly went to the MSDN forums and asked if they were planning on implementing this. I did get a response and they said that they are certainly considering it. I sure hope so! That would make something like this possible:

  string[] names = { "Richmond, VA", "Washington, DC", "Boston, MA",
                "Los Angeles CA", "Las Vegas, NV", "Seattle, WA" };
 
  List<string> results = new List<string>();
  List<Future<string>> futures = new List<Future<string>>();
  foreach (string name in names)
  {      
    futures.Add(
      Future.Create<string>(n => LongRunningMethod((string)n), name));
  }
 
  foreach (Future<string> future in futures)
  {
    results.Add(future.Value);
  }

Of course you can also accomplish this task with Parallel Linq (another part of ParallelFX), but that is for another post! One last thing to add is that if you need to tell ParallelFX how many threads to use, you can accomplish it like this:

  TaskManagerPolicy policy = new TaskManagerPolicy(0, 10);
  TaskManager manager = new TaskManager(policy);
 
  Future<string>.Create(() => LongRunningMethod("value"), manager);

That code would tell ParallelFX to use a minimum of 0 threads with an ideal thread count of 10.

Well, this post is pretty short, but Tasks and Futures really are that simple. Parallel programming has never been easier!

Just Do it! Parallel.Do in ParallelFX

The other night while having the geek dinner I was speaking with a colleague that said they had read about ParallelFX on my blog, but wasn't really sure what use they had for it in their environment. Well, it isn't always about high performance computing and matrix multiplication, sometimes it is about something as simple as sorting strings or making web service calls. So, you know what, I decided to sort some strings. Well, there is an implementation of quicksort on the array class, but there is no parallel implementation of it. So, I first decided that I needed to implement a parallel quicksort, which is actually quite easy. Since quicksort is a recursive algorithm that forks, it is almost as if it is built for being paralleled.

If you don't remember exactly how the quicksort algorithm works, here is a partial listing that shows the main QuickSort method. As you can see we start with the partition method, which picks a value and pushes all values above that values to the right, and then pushes everything below it to the left. Then we just recursively call Quicksort for both sides and continue on getting smaller and smaller until everything is sorted. All we have to do is make these two recursive QuickSort calls in parallel and the algorithm just works. How easy is that?

Quicksort

So, how would we do that with ParallelFX? Well, my first thought was to use Tasks, which are objects that can be created through ParallelFX that allow you to run Action delegates. For example, if I had a method that I needed to call which was named "DoSomething" I could run it on a separate thread like this:

  var task = System.Threading.Tasks.Task.Create(() => DoSomething());

But then I would have had to fire off two tasks and then write code to wait until they finished, but it turns out that ParallelFX already provides an easy way to do this. It is called "Parallel.Do". Using "Parallel.Do" the above code would look like this:

  System.Threading.Parallel.Do(() => DoSomething());

But the best part comes in when you do this:

  System.Threading.Parallel.Do(() => DoSomething(), 
                            () => DoSomethingElse());

This allows us to pass as many actions as we like into this method, and it doesn't return until they are all done. Exactly what we needed. So the quicksort above will now look like this (I also made it generic):

public void QuickSort<T>(T[] list, int left, int right)
    where T : IComparable<T>
{            
  int partitionIndex = partition(list, left, right);
 
  Parallel.Do(
    () => QuickSort(list, left, partitionIndex - 1),
    () => QuickSort(list, partitionIndex + 1, right));            
}

Parallel.Do also tries to reuse current threads, so (by default) you'll never use up more threads than the number of processors in your system. So, what do the numbers look like when sorting arrays of strings? Well, I created code that would randomly generate strings from 10 to 30 in length and then I populated 6 different arrays with 1000 of them. I then ran the single threaded QuickSort three times and then I run the ParallelQuicksort three times. With 1000 items, here is the number of milliseconds that the sorts took.

1000 items

So, obviously with 1000 items the differences are negligible. So, lets bump this up to 10,000 items.

10000 items

At 10,000 items we are already starting to see some significant differences. So, lets bump it up one more time to 100,000 items.

100000 items

So, there you have it. You can see a pretty good performance gain from this algorithm, but with the QuickSort algorithm it is extremely important that you pick a good pivot value so that you split up your data set as evenly as possible when you first start off. The algorithm that I am using uses the "median of 3" method of picking a pivot values. It takes the first item, the last item, and the middle item in the array, and then picks the one in the middle. If you are dealing with large sets of data, and considering the fact that we are running this in parallel, it may make sense to spend even more time trying to find a good pivot point.

So, now that I showed you that you can sort a bunch of strings faster (or integers, or dates, etc...), how about making long running web service calls? If I have 10 web service calls that I need to make, and I only have two processors in my box then ParallelFx will only use 2 threads, right? That would be more efficient, but nowhere near as efficient as it could be. And yes, that is true, but that is just the default, we can go a step further with this and tell ParallelFX how many threads to use.

  Action[] actions = {   CallWebservice1(),
                        CallWebservice2(),
                        CallWebservice3(),
                        CallWebservice4(),
                        CallWebservice5()};
 
  //1 for MinThreads, array length for IdealThreads
  var policy = new TaskManagerPolicy(1, actions.Length);
  TaskManager manager = new TaskManager(policy);
  Parallel.Do(actions, manager, TaskCreationOptions.None);

Now this is a bit of a contrived example, because you have to know ahead of time what all of your calls are going to be. You cannot easily pass information into any of these calls and I'll show you why. Lets say we have some code like this, which passes in a list of cities:

  string[] names = { "Richmond, VA", "Washington, DC", "Boston, MA",
                "Los Angeles CA", "Las Vegas, NV", "Seattle, WA" };
 
  Action[] actions = new Action[names.Length];
  float[] results = new float[names.Length];
 
  for (int i = 0; i < names.Length; i++)
  {
      actions[i] = () => CallWebService(names[i]);
  }
 
  //1 for MinThreads, array length for IdealThreads
  var policy = new TaskManagerPolicy(1, names.Length);
  TaskManager manager = new TaskManager(policy);
  Parallel.Do(actions, manager, TaskCreationOptions.None);

So, you can see that we have "CallWebService" and we are passing in "names[i]". When "Parallel.Do" is actually called we end up with an IndexOutOfRange exception! Why is that? Well, you can see that we are putting our call to "CallWebService" inside of a parameterless lambda. This creates a closure which binds to the surrounding local variables, and this includes the array index variable. So, by the time we call "Parallel.Do" the value for "i" is now set to one past the length of our array. For performing an operation like this you are going to be better off using "AsParallel" with "ForAll" like this:

  names.AsParallel(names.Length).ForAll(name => CallWebService(name));

What is happening here is that we are using "AsParallel" to get an IParallelEnumerable, and we are passing in names.Length to the "DegreeOfParallelism" parameter. This tells our enumerable to use the same number of threads as there are items in our array. If we need to get results back from this, then we can call "Select" instead of "ForAll". We also need to maintain order so that our results can be coorelated with our data. (This may or may not be important to you)

  var results = names.AsParallel
    (ParallelQueryOptions.PreserveOrdering, names.Length)
    .Select(name => CallWebService(name));

So, here you can see that we are passing the "PreserveOrdering" parameter as well as the number of threads, and then we call Select passing in our array item and the results are returned to our "results" variable as another IParallelEnumerable.

So, now you can see how you would use ParallelFX to operate on an array of values, but this could easily be used for anything that supports IEnumerable. You have also seen an example of passing data to a web service, but you could also use this for long running database calls or any other long running process. Hopefully you have found this interesting and it helps you out.

A tale of Parallel.For in ParallelFX

Most of us learned in school how to do matrix multiplication, and then most of us probably promptly forgot it. It is a topic that is neither challenging nor interesting, but it will give us a good starting point for my foray into ParallelFX. If you need a quick refresher on matrix multiplication you can go here. First we are going to start with a very basic matrix multiplication algorithm that you can find in about a million places on the interwebs.

  for (int i = 0; i < resultMatrixHeight; i++)
  {
    for (int j = 0; j < resultMatrixWidth; j++)
    {
      for (int k = 0; k < matrix1Width; k++)
      {
        result1[i, j] += matrix1[i, k] * matrix2[k, j];
      }
    }
  }

In this algorithm we are merely going through the result matrix row by row and then we loop through our two other matrices and add up the cooresponding products for the particular cell of the result matrix that we are on. I wanted to use it since it is a pretty simple algorithm and it is O(n3) so it will take a while to run for large input matrices. This will allow us to see big performance gains when we start to write our super awesome parallel algorithms. I don't know about you, but I'm feeling smarter already.

To run my tests I have created two matrices, one is 2000 x 2000 square and the other is 3400 x 2000. I decided to use very large matrices so that I could get a better idea of the performance difference in my parallel algorithms. I am also populating these matrices with random integers between 1 and 20. I am running these tests on a machine with these specs:

SystemSpecs

How bad is it when you are so lazy that you take a screenshot of your system specs rather than type them in? So, you can see I am on a dual-core CPU so we can get some decent parallel numbers, but not like we would if I had a quad core box. If anyone wants to send me one, please feel free.

During the single threaded run my system saw the expected 50% load (note that all performance pics were taken during different runs than the results I am posting because the taskmanager affects the results):

SingleThreadCPU

Our results for the single threaded run looked like this:

SingleThreadResults

So it took about 160 seconds to process the 4 million entries in the result matrix. Pretty fast, we clearly don't need to parallelize this, it is already fast. So, hope you enjoyed the post!

Ha, I wish. Before we start looking at my first stab at a parallel algorithm, lets look first at the Parallel.For statement. The signature for this method that we are going to use looks like this (this is the most basic overload of this method):

  public static void For(int fromInclusive, 
    int toExclusive, Action<int> body);

The first parameter is the integer you are starting from, in our case this is 0. The second parameter is the last number we are going to, but this number is exclusive. So, if we pass 0 and 100 then we are going to iterate from 0 to 99. Next the method takes an Action<int> delegate. An Action<int> delegate returns void and takes an integer as its single parameter. So, it just passes in the current index that you are operating on.

So, my first stab at parallizing this using the Parallel.For statement looked like this:

  Parallel.For(0, resultHeight,
    i =>
    {
      for (int j = 0; j < resultWidth; j++)
      {
        for (int k = 0; k < matrix1Width; k++)
        {
          Interlocked.Add(ref result2[i, j], 
            matrix1[i, k] * matrix2[k, j]);
        }
      }
    }
  );

Initially I thought that this might be a *bit* faster than the single threaded algorithm, but the overhead involved with the Interlocked.Add was just too much. During the execution of the run my CPU did spike to 100%:

Parallel1CPU

But the overhead was just too much and our results ended up looking like this:

Parallel1Results

It took over 210 seconds now to process the same size matrices! Well, that does not provide us with promising results, but it is *possible* that if we had a quad core box that we could overcome the cost of Interlocked.Add and still process these matrices faster. The good news is though that we don't need four cores, we can find a better way to do this.

I looked at our previous parallel algorithm and though "how can we get rid of the call to Interlocked.Add". Well, the answer is to use ThreadLocalState. This is another feature of Parallel.For method. You can tell it a type that you are going to use to maintain state within a single thread (not for each iteration) in our method. The overload that we are going to use looks like this:

  public static void For<TLocal>(int fromInclusive, 
    int toExclusive, Func<TLocal> threadLocalSelector, 
    Action<int, ParallelState<TLocal>> body);

A wee bit more complicated that our last Parallel.For overload and it needs a bit of explanation. First there is a parallel parameter "TLocal" that we can pass to tell the method what type our ThreadLocalState is going to be. I'll explain this in a second. The first two parameters operate exactly the same as they did previously, but the next parameter is a Func delegate that returns the same type as our ThreadLocalState. It is named threadLocalSelector and is used to initialize our thread state. In our case the type is "int" and so we are just returning 0. The next parameter is another Action delegate like we used before, only this time there are two parameters. One is our indexer and the other is the ThreadLocalState.

So, I decided to go ahead and use this ThreadLocalState in my algorithm in order to accumulate the results. My implementation looks like this:

  Parallel.For<int>(0, resultHeight,
    () => 0,
    (i, state) =>
    {
      for (int j = 0; j < resultWidth; j++)
      {
        state.ThreadLocalState = 0;
        for (int k = 0; k < matrix1Width; k++)
        {
          state.ThreadLocalState += matrix1[i, k] * matrix2[k, j];
        }
        Interlocked.Add(ref result3[i, j], state.ThreadLocalState);
      }
    }
  );

So, between each time we change the position in our result array we zero out our thread state and then we loop through and do our calculations adding the results to our local state, thereby avoiding having to use Interlocked.Add until we get to the end of our calculations. Then we call Interlocked.Add and add our accumulated value to the appropriate place in our result array. The results end up looking like this:

Parallel2Results

Now that is a bit more like it. Quite a bit faster than our original single threaded algorithm. After I saw this I patted myself on the back and began to move on, until I realized that I was an idiot.

Realization 1: notice how my multi-threaded algorithm actually takes less than half of the time to run than my original single threaded algorithm. How is this?

Realization 2: This whole time I had my head stuck in the "Parallel programming lock everything" mindset that I failed to realize that my algorithm didn't require any locking! I noticed that we aren't modifying matrix1 or matrix2 at all, so obviously they don't require locking. But since we are dividing up our threading on the "i" indexer you will notice that this means that no two threads will ever modify the same index in our result array since they will always have different values of "i"!

Realization 1 made me understand that in my ThreadLocalState implementation, a lot of the speedup of the algorithm was due to the fact that we weren't adding directly to the array in every loop. This was causing us to do a large number of array lookups, so moving to the ThreadLocalState sped us up in this area as well, and gave us an even bigger speed boost.

Realization number 2 made me feel a bit sheepish. After I noticed that I really didn't need to have locking, I decided that a rewrite of the algorithm was in order.

So, with this under my belt, I produced this:

  Parallel.For(0, resultHeight,        
    i =>
    {
      for (int j = 0; j < resultWidth; j++)
      {
        int sum = 0;
        for (int k = 0; k < matrix1Width; k++)
        {              
          sum += matrix1[i, k] * matrix2[k, j];
        }
        result3[i, j] = sum;
      }
    }
  );

Now look how simple that is. And unless I am totally missing something (which is quite possible), there is no locking needed. So, running our newest algorithm, we get this result:

Parallel3Results

There you have it, we shaved over 15 seconds off our last time. And almost exactly half of the time we got after I rewrote the single threaded algorithm to use a local sum variable (The rewritten single threaded method took 121 seconds). So, I hope you enjoyed my little foray through the Parallel.For method and hopefully your adventure with ParallelFX won't waste quite as much time as mine did. :-) Also, I'm sure that someone is going to point out how this algorithm can be faster, so I am going to go ahead and ask for suggestions on how to make it faster. Anyone have any ideas?

ParallelMap function using ParallelFX

In a previous post I implemented a ParallelMap class using the ThreadPool class and some fancy schmancy code. My method ended up looking like this:

  public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
          this IEnumerable<TArg> list,
          Func<TArg, TResult> func)
  {
    List<TResult> result = new List<TResult>();
    using (var resetEvent = new ManualResetEvent(false))
    {
      int count = 0;
      foreach (TArg item in list)
      {
        WaitCallback run =
          n =>
          {
            TResult value = func(item);
            lock (result)
            {
              result.Add(value);
            }
            if (Interlocked.Decrement(ref count) == 0)
            {
              resetEvent.Set();
            }
          };
        Interlocked.Increment(ref count);
        ThreadPool.QueueUserWorkItem(run);
      }
      resetEvent.WaitOne();
    }
    return result;
  }

Well, I have been wanting to play with ParallelFX for a while and after Daniel Moth commented on a blog post the other day I saw his ParallelFX videos and started watching some of them. I then began playing around with some of the code in the latest CTP and rewrote the ParallelMap method above using ParallelFX:

  public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
    this IEnumerable<TArg> list,
    Func<TArg, TResult> func)
  {
    List<TResult> result = new List<TResult>();
    Parallel.ForEach<TArg>(list,
      (TArg n) =>
      {
        TResult val = func(n);
        lock (result)
        {
          result.Add(val);
        }
      });
    return result;
  }

Pretty simple. I use the same method signature as before and I declare my result list, but then all I have to do is call Parallel.ForEach and pass in my list as the first parameter along with a lambda that takes a TArg (I put the type there so that the code is clearer what is being passed in). As you can see in my lambda I am calling my method and getting the result then I add my answer to the result. I still have to lock on the result since this code is being executed in multiple threads.

In a later post I implemented a ParallelMap method that maintained order, it was even bigger than the method above and it even required a seperate struct to maintain the index and the item. So, I decided to go ahead and try to rewrite that method using the Parallel Extensions:

  public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
    this IEnumerable<TArg> list,
    Func<TArg, TResult> func)
  {
    TResult[] result = new TResult[list.Count()];
    Parallel.ForEach<TArg>(list,        
      (TArg n, int i) =>
      {
        result[i] = func(n);
      });
    return result;
  }

Wow. If hadn't been writing my earlier post for fun, I would have felt like I wasted a lot of time. :-) I just declare my array instead of a list and then I call Parallel.ForEach and then I change my lambda to take the optional index parameter. This allows me to assign my results to my array in the order that they appeared in the original IEnumerable. Pretty awesome. I'm going to have to play around with the other parts of ParallelFX and hopefully make a few posts on it in the future. Hope you enjoyed!