Implement your own Parallel.For in C#
This article is intended for .NET 2 or 3.5. If you are on .NET 4, use System.Threading.Parallel class.
I was thinking about how Parallel.For could be implemented. I wrote my own, I am using it in my own project and it scales very well.
Here it is:
public class Parallel
{
/// <summary>
/// Parallel for loop. Invokes given action, passing arguments
/// fromInclusive - toExclusive on multiple threads.
/// Returns when loop finished.
/// </summary>
public static void For(int fromInclusive, int toExclusive, Action action)
{
// chunkSize = 1 makes items to be processed in order.
// Bigger chunk size should reduce lock waiting time and thus
// increase paralelism.
int chunkSize = 4;
// number of process() threads
int threadCount = Environment.ProcessorCount;
int index = fromInclusive - chunkSize;
// locker object shared by all the process() delegates
var locker = new object();
// locker object shared by all the process() delegates
var locker = new object();
// processing function
// takes next chunk and processes it using action
var process = delegate()
{
while (true)
{
int chunkStart = 0;
lock (locker)
{
// take next chunk
index += chunkSize;
chunkStart = index;
}
// process the chunk
// (another thread is processing another chunk
// so the real order of items will be out-of-order)
// so the real order of items will be out-of-order)
for (int i = chunkStart; i < chunkStart + chunkSize; i++)
{
if (i >= toExclusive) return;
action(i);
}
}
};
// launch process() threads
IAsyncResult[] asyncResults = new IAsyncResult[threadCount];
for (int i = 0; i < threadCount; ++i)
{
asyncResults[i] = process.BeginInvoke(null, null);
}
// wait for all threads to complete
for (int i = 0; i < threadCount; ++i)
{
process.EndInvoke(asyncResults[i]);
}
}
As noted in the code, by setting chunkSize to 1 we can make the items be processed in order. With bigger chunk size, items can be processed in mixed non-deterministic order, which is ok in some application, like when you want to modify all items of a collection or render lines of a picture.
Bigger chunkSize should reduce lock waiting time and thus increase overall speed. But too big chunkSize is bad too, because if the work is split into only a few big parts, there is not enough parallelism exposed - we could find ourselves waiting for the last single thread to finish its large chunk.
Using Parallel.For is simple:
Parallel.For(0, 1000, delegate(int i)
{
// your parallel code
Thread.Sleep(100);
Console.WriteLine(i);
});
8 comments:
Thanks! I've used it here http://alicebobandmallory.com/articles/2010/01/14/prime-factorization-in-parallel
Why are you locking on typeof(Parallel)? You should just create an instance of Object in the For setup code and use that as your lock for all the parallel workers. As a result you'd be able to execute two Parallel.Fors at once without contending, also. Locking on types is not a good idea.
Wouldn't it be easier to just use ThreadPool instead of manually reimplementing it? (http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx)
Hello,
I love your post on the Parallel.For.
Implemented as a test on a project I'm developing on a Windows Services.
I'm having some problems because the service scans a SQL database table and returns aproximadamento some 2 million records, then it performs some actions and finally it writes the result of operation fied in a text file.
Looking at the text file I realized that it works well but some records are compromised because some threads try to write the file while the value of the record is wrong.
How can I do to allow a result to be written on the bench only after a thread may have finished writing?
It is worth observing that I'm using a server with 16 processing cores.
Part code:
//Cria arquivo de log
FileStream fs = new FileStream("C:\\Logs\\001_log.txt", FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read);
StreamWriter fp = new StreamWriter(fs);
Parallel.For(0, table.Rows.Count, (k) =>
{
try
{
Int32 iReturn = ProcRegister((Int32)table.Rows[0][0]);
fp.WriteLine(iReturn);
fp.Flush();
}
catch{}
finally{}
});
fp.Close();
fp.Dispose();
fs.Dispose();
fs.Close();
Part of the log file with problem:
...
985217
991848
985680
988684
989989
983342
7367 983342 <---
7367 983342 <---
7367 <---
984540
986244
986016
984964
991237
...
There is an error in the code. The parameter should be an Action because you take an int as parameter. As wel var process = delegate() won't compile here. Had to change it to Action process = delegate(). Anyway, you helped me out! Thanks!
Nice post very helpful
dbakings
This does not compile in .NET 3.5, because process cannot be declared a var. I'm unfamiliar enough with delegates to not know what it's type should be to avoid this error.
have a look at bit, 5 bit of, 6 small tens. https://imgur.com/a/PA5esbP https://imgur.com/a/Sv4XuMF https://imgur.com/a/CADoHta https://imgur.com/a/PQsSYr3 https://imgur.com/a/RZZHbW6 https://imgur.com/a/x4b1RnY https://imgur.com/a/e4cE6wJ
Post a Comment