Implement your own Parallel.For in C#

This article is intended for .NET 2 or 3.5. If you are on .NET 4, use System.Threading.Parallel class.

I was thinking about how Parallel.For could be implemented. I wrote my own, I am using it in my own project and it scales very well.

Here it is:

public class Parallel
{
/// <summary>
/// Parallel for loop. Invokes given action, passing arguments
/// fromInclusive - toExclusive on multiple threads.
/// Returns when loop finished.
/// </summary>
public static void For(int fromInclusive, int toExclusive, Action action)
{
    // chunkSize = 1 makes items to be processed in order.
    // Bigger chunk size should reduce lock waiting time and thus
    // increase paralelism.
    int chunkSize = 4;

    // number of process() threads
    int threadCount = Environment.ProcessorCount;
    int index = fromInclusive - chunkSize;
    // locker object shared by all the process() delegates
    var locker = new object();

    // processing function
    // takes next chunk and processes it using action
    var process = delegate()
    {
        while (true)
        {
            int chunkStart = 0;
            lock (locker)
            {
                // take next chunk
                index += chunkSize;
                chunkStart = index;
            }
            // process the chunk
            // (another thread is processing another chunk 
            //  so the real order of items will be out-of-order)
            for (int i = chunkStart;  i < chunkStart + chunkSize; i++)
            {
                if (i >= toExclusive) return;
                action(i);
            }
        }
    };

    // launch process() threads
    IAsyncResult[] asyncResults = new IAsyncResult[threadCount];
    for (int i = 0; i < threadCount; ++i)
    {
        asyncResults[i] = process.BeginInvoke(null, null);
    }
    // wait for all threads to complete
    for (int i = 0; i < threadCount; ++i)
    {
        process.EndInvoke(asyncResults[i]);
    }
}


As noted in the code, by setting chunkSize to 1 we can make the items be processed in order. With bigger chunk size, items can be processed in mixed non-deterministic order, which is ok in some application, like when you want to modify all items of a collection or render lines of a picture.
Bigger chunkSize should reduce lock waiting time and thus increase overall speed. But too big chunkSize is bad too, because if the work is split into only a few big parts, there is not enough parallelism exposed - we could find ourselves waiting for the last single thread to finish its large chunk.

Using Parallel.For is simple:

Parallel.For(0, 1000, delegate(int i)
{
    // your parallel code
    Thread.Sleep(100);
    Console.WriteLine(i);
});


kick it on DotNetKicks.com

Posted by Martin Konicek on 10:18 PM

8 comments:

Jonas Elfström said...

Thanks! I've used it here http://alicebobandmallory.com/articles/2010/01/14/prime-factorization-in-parallel

Kevin Gadd said...

Why are you locking on typeof(Parallel)? You should just create an instance of Object in the For setup code and use that as your lock for all the parallel workers. As a result you'd be able to execute two Parallel.Fors at once without contending, also. Locking on types is not a good idea.

Anonymous said...

Wouldn't it be easier to just use ThreadPool instead of manually reimplementing it? (http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx)

Rogerio Perez said...

Hello,

I love your post on the Parallel.For.
Implemented as a test on a project I'm developing on a Windows Services.

I'm having some problems because the service scans a SQL database table and returns aproximadamento some 2 million records, then it performs some actions and finally it writes the result of operation fied in a text file.

Looking at the text file I realized that it works well but some records are compromised because some threads try to write the file while the value of the record is wrong.

How can I do to allow a result to be written on the bench only after a thread may have finished writing?

It is worth observing that I'm using a server with 16 processing cores.

Part code:

//Cria arquivo de log
FileStream fs = new FileStream("C:\\Logs\\001_log.txt", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read);
StreamWriter fp = new StreamWriter(fs);

Parallel.For(0, table.Rows.Count, (k) =>
{
try
{
Int32 iReturn = ProcRegister((Int32)table.Rows[0][0]);
fp.WriteLine(iReturn);
fp.Flush();
}
catch{}
finally{}
});

fp.Close();
fp.Dispose();
fs.Dispose();
fs.Close();

Part of the log file with problem:

...
985217
991848
985680
988684
989989
983342
7367 983342 <---
7367 983342 <---
7367 <---
984540
986244
986016
984964
991237
...

Anonymous said...

There is an error in the code. The parameter should be an Action because you take an int as parameter. As wel var process = delegate() won't compile here. Had to change it to Action process = delegate(). Anyway, you helped me out! Thanks!

Unknown said...

Nice post very helpful

dbakings

Alex said...

This does not compile in .NET 3.5, because process cannot be declared a var. I'm unfamiliar enough with delegates to not know what it's type should be to avoid this error.

Post a Comment