交互开发,八天玩转并行开发

1、Parallel.Invoke
最主要用以职务的并行
  那个函数的作用和Task有个别相似,就是出现执行1多样职分,然后等待全体完结。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相干线管道理成效。它有三种样式:
  Parallel.Invoke( params Action[] actions);
  Parallel.Invoke(Action[] actions,TaskManager
manager,TaskCreationOptions options);

System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach那八个静态方法。

 

 随着多核时期的来临,并行开发特别浮现出它的兵不血刃威力,像大家这么的码农再也不用过多的关切底层线程的兑现和手工业控制,

亚洲必赢官网 1亚洲必赢官网 2

1 Parallel.Invoke

     
随着多核时代的赶到,并行开发尤其彰显出它的有力威力,像大家如此的码农再也不用过多的关怀底层线程的达成和手工业控制,

要打听并行开发,须要先领悟下三个概念:“硬件线程”和“软件线程”。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var actions = new Action[]
            {
                () => ActionTest("test 1"),
                () => ActionTest("test 2"),
                () => ActionTest("test 3"),
                () => ActionTest("test 4")
            };

            Console.WriteLine("Parallel.Invoke 1 Test");
            Parallel.Invoke(actions);

            Console.WriteLine("结束!");
        }

        static void ActionTest(object value)
        {
            Console.WriteLine(">>> thread:{0}, value:{1}",
            Thread.CurrentThread.ManagedThreadId, value);
        }
    }
}

尽可能并行执行所提供的各类操作,除非用户撤消了操作。

要通晓并行开发,供给先领悟下三个概念:“硬件线程”和“软件线程”。

 

Program

方法:

 

  1. 硬件线程

2、For方法,主要用来拍卖针对数组成分的并行操作(多少的相互

1)public static void Invoke(params
Action[] actions);

  1. 硬件线程

   
相信我们手头的电脑都以双核以上的,像作者那样古董的微型总计机都以双核的,那样的双核叫做物理基础。

亚洲必赢官网 3亚洲必赢官网 4

2)public static void
Invoke(ParallelOptions parallelOptions,

   
相信咱们手头的微型计算机都以双核以上的,像自家如此古董的微处理器都以双核的,那样的双核叫做物理基础。

亚洲必赢官网 5

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.For(0, nums.Length, (i) =>
            {
                Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

params Action[] actions);

亚洲必赢官网 6

 

Program

参数:

 

硬件线程又称作逻辑内核,大家得以在”任务管理器“中查看”质量“标签页,如下图,大家清楚有3个硬件线程。

三、Foreach方法,重要用于拍卖泛型集合成分的并行操作(数码的交互)

parallelOptions:1个指标,用于配置此操作的行事。

硬件线程又称之为逻辑内核,大家得以在”任务管理器“中查阅”品质“标签页,如下图,大家掌握有贰个硬件线程。

 亚洲必赢官网 7

亚洲必赢官网 8亚洲必赢官网 9

Actions:要执行的操作数组

 亚洲必赢官网 10

 

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.ForEach(nums, (item) =>
            {
                Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

异常:

 

诚如情况下,一个大体基础对应贰个逻辑内核,比如自身那里的二对二。当然假如您的cpu采纳的是超线程技术,那么恐怕就会有五个大体基本对应

Program

对方法1:

1般景色下,叁个大体基础对应叁个逻辑内核,比如自身那里的贰对二。当然假设您的cpu选用的是超线程技术,那么可能就会有四个大体基本对应

捌个硬件线程,未来有诸多服务器都有几个硬件线程,早上在集团的服务器上截了个图。

  数据的并行的艺术二(AsParallel()):

    System.ArgumentNullException:
actions 参数为 null。

八个硬件线程,现在有好多服务器都有八个硬件线程,中午在小卖部的服务器上截了个图。

亚洲必赢官网 11

亚洲必赢官网 12亚洲必赢官网 13

    System.AggregateException:当 actions
数组中的任何操作引发这么些时引发的丰硕。

亚洲必赢官网 14

大家要掌握相互开发要做的事情就是将职分分摊给那一个硬件线程去并行执行来达到负载和增长速度。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            Console.WriteLine(evenNumbers.Count());
            //foreach (int item in evenNumbers)
            //    Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

System.ArgumentException:actions数组包涵null 个成分。

咱俩要理解互相开发要做的事情正是将职责分摊给这么些硬件线程去并行执行来完结负载和加快。

 

Program

对章程二除上述极度外还包涵:

 

  1. 软件线程

  .AsOrdered() 对结果实行排序:

System.OperationCanceledException:parallelOptions
设置了System.Threading.CancellationToken。

  1. 软件线程

   
相信那个大家最熟识了,大家知道守旧的代码都以串行的,就四个主线程,当我们为了达成加快而开了重重工作线程,这个工作线程

亚洲必赢官网 15亚洲必赢官网 16

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken
关联的System.Threading.CancellationTokenSource已被放出。

   
相信这么些大家最熟练了,大家掌握守旧的代码都以串行的,就2个主线程,当我们为了达成加快而开了无数做事线程,那些工作线程

也正是软件线程。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{

    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            //Console.WriteLine(evenNumbers.Count());
            foreach (int item in evenNumbers)
                Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

说明:

也便是软件线程。

 

Program

一)Invoke方法唯有在actions全体实践完才会回来,即便在实行进程中出现很是也会做到。

 

好,大家驾驭了基本概念就ok了,在.net
四.0中,微软给我们提供了三个新的命名空间:System.Threading.Tasks。那个中有那多少个好玩

  ForEach的亮点正是能够将数据举行分区,每贰个小区内落成串行计算,分区选用Partitioner.Create达成。

二)无法保险actions中的全数操作同时执行。比如actions大小为四,但硬件线程数为贰,那么与此同时运营的操作数最多为贰。

好,我们知晓了基本概念就ok了,在.net
四.0中,微软给我们提供了三个新的命名空间:System.Threading.Tasks。那当中有诸多好玩

的东西,作为第2篇就介绍下最基础,最简易的Parallel的应用。

亚洲必赢官网 17亚洲必赢官网 18

交互开发,八天玩转并行开发。三)actions中的操作并行的运行且与各种无关,若编写与运作顺序有关的产出代码,应慎选任何办法。

的东西,作为第2篇就介绍下最基础,最简便易行的Parallel的利用。

 

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j++)
            {
                ConcurrentBag<int>  bag = new ConcurrentBag<int>();
                var watch = Stopwatch.StartNew();
                watch.Start();
                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m++)
                    {
                        bag.Add(m);
                    }
                });
                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();

            }
        }
    }
}

4)要是运用Invoke加载四个操作,八个操作运营时刻迥异,总的运营时刻以消耗费时间间最长操作为基准,那会招致千千万万逻辑内核长期处于空闲状态。

 

亚洲必赢官网 19

Program

5)受限的并行可增加性,那缘于Invoke所调用的嘱托数目是一定的。

亚洲必赢官网 20

 

  ParallelOptions类
  ParallelOptions options = new ParallelOptions();
  //钦赐使用的硬件线程数为肆
  options.MaxDegreeOfParallelism = 4;
  有时候我们的线程大概会跑遍全部的内核,为了增强别的应用程序的安定,就要限量加入的基本,正好ParallelOptions提供了马克斯DegreeOfParallelism属性。

2 Parallel.For

 

一: Parallel的使用

亚洲必赢官网 21亚洲必赢官网 22

兴许会相互运营迭代,能够监视和操作循环的情事。Parallel.For有两个重载的主意,下边列举部分方式。

一: Parallel的使用

在Parallel上边有多少个常用的艺术invoke,for和forEach。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Student
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime CreateTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var dic = LoadData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            var query2 = (from n in dic.Values.AsParallel()
                          where n.Age > 20 && n.Age < 25
                          select n).ToList();
            watch.Stop();
            Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);

            Console.Read();
        }

        public static ConcurrentDictionary<int, Student> LoadData()
        {
            ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬件线程数为4
            options.MaxDegreeOfParallelism = 4;
            //预加载1500w条记录
            Parallel.For(0, 15000000, options, (i) =>
            {
                var single = new Student()
                {
                    ID = i,
                    Name = "hxc" + i,
                    Age = i % 151,
                    CreateTime = DateTime.Now.AddSeconds(i)
                };
                dic.TryAdd(i, single);
            });

            return dic;
        }
    }
}

方法:

在Parallel下边有八个常用的方法invoke,for和forEach。

1:  Parallel.Invoke

Program

1)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, Action<int> body);

1:  Parallel.Invoke

    那是最简单易行,最精简的将串行的代码并行化。

广阔难点的拍卖

2)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, Action<int,
ParallelLoopState> body);

    那是最简易,最精简的将串行的代码并行化。

亚洲必赢官网 23😉

  <1> 怎样中途退出并行循环?
  是的,在串行代码中大家break一下就解决了,不过相互就不是这么简单了,但是没什么,在互动循环的信托参数中提供了贰个ParallelLoopState,该实例提供了Break和Stop方法来帮大家达成。
  Break:
当然这么些是打招呼并行计算尽快的淡出循环,比如并行总结正在迭代拾0,那么break后先后还会迭代全部小于100的。

3)public static ParallelLoopResult
For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions,
Action<int, ParallelLoopState> body);

 1 class Program
 2 {
 3     static void Main(string[] args)
 4     {
 5         var watch = Stopwatch.StartNew();
 6 
 7         watch.Start();
 8 
 9         Run1();
10 
11         Run2();
12 
13         Console.WriteLine("我是串行开发,总共耗时:{0}\n", watch.ElapsedMilliseconds);
14 
15         watch.Restart();
16 
17         Parallel.Invoke(Run1, Run2);
18 
19         watch.Stop();
20 
21         Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds);
22 
23         Console.Read();
24     }
25 
26     static void Run1()
27     {
28         Console.WriteLine("我是任务一,我跑了3s");
29         Thread.Sleep(3000);
30     }
31 
32     static void Run2()
33     {
34         Console.WriteLine("我是任务二,我跑了5s");
35         Thread.Sleep(5000);
36     }
37 }
 1 class Program
 2 {
 3     static void Main(string[] args)
 4     {
 5         var watch = Stopwatch.StartNew();
 6 
 7         watch.Start();
 8 
 9         Run1();
10 
11         Run2();
12 
13         Console.WriteLine("我是串行开发,总共耗时:{0}\n", watch.ElapsedMilliseconds);
14 
15         watch.Restart();
16 
17         Parallel.Invoke(Run1, Run2);
18 
19         watch.Stop();
20 
21         Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds);
22 
23         Console.Read();
24     }
25 
26     static void Run1()
27     {
28         Console.WriteLine("我是任务一,我跑了3s");
29         Thread.Sleep(3000);
30     }
31 
32     static void Run2()
33     {
34         Console.WriteLine("我是任务二,我跑了5s");
35         Thread.Sleep(5000);
36     }
37 }

  Stop:那几个就不一样等了,比如正在迭代拾0黑马遇上stop,那它啥也随便了,间接退出。

4)public static ParallelLoopResult
For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions
parallelOptions, Func<TLocal> localInit, Func<int,
ParallelLoopState, TLocal, TLocal> body, Action<TLocal>
localFinally);

亚洲必赢官网 24

亚洲必赢官网 25😉

亚洲必赢官网 26亚洲必赢官网 27

参数:

在那个例子中得以获得二点消息:

亚洲必赢官网 28

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();

            Parallel.For(0, 20000000, (i, state) =>
            {
                if (bag.Count == 1000)
                {
                    //state.Break();
                    state.Stop();
                    return;
                }
                bag.Add(i);
            });

            Console.WriteLine("当前集合有{0}个元素。", bag.Count);

        }
    }
}

fromInclusive:早先索引(含)。

第3:二个职务是能够分解成多个任务,选取分而治之的研商。

在这几个例子中得以获得2点新闻:

Program

toExclusive:甘休索引(不含)。

第三:尽可能的幸免子任务之间的依靠,因为子义务是并行执行,所以就从不什么人一定在前,什么人一定在后的鲜明了。

先是:三个义务是能够分解成四个任务,选取分而治之的构思。

  取消(cancel)

body:将被每一种迭代调用二次的寄托。

 

第一:尽可能的防止子义务之间的注重,因为子职责是并行执行,所以就从不谁一定在前,什么人一定在后的规定了。

亚洲必赢官网 29亚洲必赢官网 30

parallelOptions:一个目标,用于配置此操作的行为。

2:Parallel.for

 

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static void Main()
        {

            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            Task.Factory.StartNew(() => fun(ct));
            Console.ReadKey();
            //Thread.Sleep(3000);
            cts.Cancel();
            Console.WriteLine("任务取消了!");

        }

        static void fun(CancellationToken token)
        {
            Parallel.For(0, 100000,
                        new ParallelOptions { CancellationToken = token },
                        (i) =>
                        {
                            Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
                        });
        }
    }
}

localInit:一个委托,用于再次来到每一个任务的地面数据的上马状态。

 我们知晓串行代码中也有1个for,可是10分for并未用到多核,而Paraller.for它会在尾部依照硬件线程的运营情况来尽量的选取具有的可

2:Parallel.for

Program

localFinally:三个寄托,用于对种种职分的本地意况执行3个末段操作。

应用的硬件线程,注意那里的Parallel.for的步行是壹。

 咱们知道串行代码中也有二个for,但是充足for并不曾用到多核,而Paraller.for它会在底层依据硬件线程的运营情状来尽量的使用全体的可

  <贰> 并行总括中抛出十三分怎么处理?
  首先义务是并行总计的,处理进程中可能会时有发生n多的不得了,那么怎样来收获到那些13分呢?普通的Exception并不能获得到格外,不过为互相诞生的AggregateExcepation就足以博得到一组尤其。

再次回到结果:

此间我们来演示一下,向八个线程安全的集纳插入数据,当然这么些集合选取原子性来兑现线程同步,比那么些重量级的锁机制特别的节约消耗。

选择的硬件线程,注意那里的Parallel.for的步行是1。

亚洲必赢官网 31亚洲必赢官网 32

ParallelLoopResult
:蕴含关于已形成的巡回部分的新闻。

 1  class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             for (int j = 1; j < 4; j++)
 6             {
 7                 Console.WriteLine("\n第{0}次比较", j);
 8 
 9                 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10 
11                 var watch = Stopwatch.StartNew();
12 
13                 watch.Start();
14 
15                 for (int i = 0; i < 20000000; i++)
16                 {
17                     bag.Add(i);
18                 }
19 
20                 Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
21 
22                 GC.Collect();
23 
24                 bag = new ConcurrentBag<int>();
25 
26                 watch = Stopwatch.StartNew();
27 
28                 watch.Start();
29 
30                 Parallel.For(0, 20000000, i =>
31                 {
32                     bag.Add(i);
33                 });
34 
35                 Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
36 
37                 GC.Collect();
38 
39             }
40         }
41     }

那边大家来演示一下,向2个线程安全的聚合插入数据,当然那么些集合接纳原子性来落到实处线程同步,比那贰个重量级的锁机制越发的节约消耗。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Parallel.Invoke(Run1, Run2);
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            Console.WriteLine("结束了!");
            //Console.Read();
        }

        static void Run1()
        {
            Thread.Sleep(3000);
            throw new Exception("我是任务1抛出的异常");
        }

        static void Run2()
        {
            Thread.Sleep(5000);
            throw new Exception("我是任务2抛出的异常");
        }
    }
}

异常:

亚洲必赢官网 33

亚洲必赢官网 34😉

Program

System.ArgumentNullException:body 参数为
null,或 localInit 参数为 null,或 localFinally 参数为 null,或
parallelOptions 参数为 null。
System.AggregateException:包括在颇具线程上吸引的满贯单个很是的相当。

 

 1  class Program  2     {  3         static void Main(string[] args)  4         {  5             for (int j = 1; j < 4; j++)  6             {  7                 Console.WriteLine("\n第{0}次比较", j);  8   9                 ConcurrentBag<int> bag = new ConcurrentBag<int>(); 10  11                 var watch = Stopwatch.StartNew(); 12  13                 watch.Start(); 14  15                 for (int i = 0; i < 20000000; i++) 16                 { 17                     bag.Add(i); 18                 } 19  20                 Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); 21  22                 GC.Collect(); 23  24                 bag = new ConcurrentBag<int>(); 25  26                 watch = Stopwatch.StartNew(); 27  28                 watch.Start(); 29  30                 Parallel.For(0, 20000000, i => 31                 { 32                     bag.Add(i); 33                 }); 34  35                 Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); 36  37                 GC.Collect(); 38  39             } 40         } 41     }

  注意Parallel里面 不提议抛出非凡因为在最为的情形下比如进入的首先批线程先都抛极度了
此时AggregateExcepation就只可以捕获到这一堆的不当,然后程序就与世长辞了

对于措施3)和四)除含有以上卓殊外还包蕴:

能够看的出,加快的法力如故比较显然的。

亚洲必赢官网 35😉

亚洲必赢官网 36亚洲必赢官网 37

System.OperationCanceledException:在
parallelOptions 设置了参数 System.Threading.CancellationToken。

 

亚洲必赢官网 38

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            try
            {
                Parallel.For(0, 10, (i) =>
                {
                    try
                    {
                        TestClass a = new TestClass();
                        a.Test(i);
                    }
                    catch (Exception ex)
                    {
                        lock (locker)
                        {
                            errList.Add(ex.Message);
                            throw ex;
                        }
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index++, err);
            }
        }
    }
}

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken 关联的
System.Threading.CancellationTokenSource已被释放。

3:Parallel.forEach
   
forEach的助益就是能够将数据开始展览分区,每四个小区内达成串行总括,分区选择Partitioner.Create完成。

 

Program

说明:

 class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j++)
            {
                Console.WriteLine("\n第{0}次比较", j);

                ConcurrentBag<int> bag = new ConcurrentBag<int>();

                var watch = Stopwatch.StartNew();

                watch.Start();

                for (int i = 0; i < 3000000; i++)
                {
                    bag.Add(i);
                }

                Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

                GC.Collect();

                bag = new ConcurrentBag<int>();

                watch = Stopwatch.StartNew();

                watch.Start();

                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m++)
                    {
                        bag.Add(m);
                    }
                });

                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

                GC.Collect();

            }
        }
    }

能够看的出,加速的效应如故比较分明的。

  能够向下边这样来拍卖一下
  不在AggregateExcepation中来处理 而是在Parallel里面包车型地铁try
catch来记录错误,或处理错误

一)不扶助浮点和步进。

亚洲必赢官网 39

 

亚洲必赢官网 40亚洲必赢官网 41

二)不恐怕有限支撑迭代的实施顺序。

那边依旧要说一下:Partitioner.Create(0, 三千000)。

三:Parallel.forEach    
forEach的优点就是能够将数据开始展览分区,每2个小区内完毕串行总计,分区接纳Partitioner.Create完成。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            Parallel.For(0, 10, (i) =>
            {
                try
                {
                    TestClass a = new TestClass();
                    a.Test(i);
                }
                catch (Exception ex)
                {
                    lock (locker)
                    {
                        errList.Add(ex.Message);
                    }
                    //Console.WriteLine(ex.Message);
                    //注:这里不再将错误抛出.....
                    //throw ex;
                }
            });

            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index++, err);
            }
        }
    }
}

三)固然fromInclusive大于或等于toExclusive,方法登时赶回而不会执行别的迭代。

率先:大家要分区的界定是0-三千000。

亚洲必赢官网 42😉

Program

四)对于body参数中包蕴的ParallelLoopState实例,其作用为提前中断互相循环。

其次:我们必然想精晓系统给大家分了多少个区?
很不满,那是系统之中协调的,无权告诉大家,当然系统也不反对大家团结内定分区个数,

 class Program     {         static void Main(string[] args)         {             for (int j = 1; j < 4; j++)             {                 Console.WriteLine("\n第{0}次比较", j);
                ConcurrentBag<int> bag = new ConcurrentBag<int>();
                var watch = Stopwatch.StartNew();
                watch.Start();
                for (int i = 0; i < 3000000; i++)                 {                     bag.Add(i);                 }
                Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();
                bag = new ConcurrentBag<int>();
                watch = Stopwatch.StartNew();
                watch.Start();
                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>                 {                     for (int m = i.Item1; m < i.Item2; m++)                     {                         bag.Add(m);                     }                 });
                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();
            }         }     }

 

伍)只有在迭代全部到位以往才会重返结果,不然循环将平素不通。

       
那里能够使用Partitioner.Create的第七个重载,比如这样:Partitioner.Create(0,
三千000, Environment.ProcessorCount),

亚洲必赢官网 43😉

 

       
因为 Environment.ProcessorCount能够获得到最近的硬件线程数,所以这边也就开了2个区。

亚洲必赢官网 44

3 Parallel.ForEach

 

那边依旧要说一下:Partitioner.Create(0, 三千000)。

方法

上边分享下并行总括中大家兴许部分疑心?

率先:大家要分区的限定是0-三千000。

1)public static ParallelLoopResult
ForEach(IEnumerable<TSource> source, Action<TSource>
body);

<一> 怎么样中途退出并行循环?

第三:大家必定想清楚系统给大家分了多少个区?
很遗憾,那是系统内部协调的,无权告诉大家,当然系统也不反对大家有福同享钦定分区个数,

2)public static ParallelLoopResult
ForEach<TSource>(IEnumerable<TSource> source,
ParallelOptions parallelOptions, Action<TSource,
ParallelLoopState> body);

     
是的,在串行代码中大家break一下就化解了,可是相互就不是这么容易了,然则没什么,在互动循环的信托参数中提供了2个

       
那里能够利用Partitioner.Create的第陆个重载,比如那样:Partitioner.Create(0,
两千000, Environment.ProcessorCount),

3)public static ParallelLoopResult
ForEach<TSource>(Partitioner<TSource> source,
Action<TSource> body);

ParallelLoopState,该实例提供了Break和Stop方法来帮大家贯彻。

       
因为 Environment.ProcessorCount能够赢得到当前的硬件线程数,所以那里也就开了一个区。

参数:

Break:
当然这些是通报并行总括尽快的脱离循环,比如并行计算正在迭代拾0,那么break后先后还会迭代全体小于100的。

 

source:数据源

Stop:那几个就不一致了,比如正在迭代100突然境遇stop,那它啥也不论了,直接退出。

上面分享下并行总结中大家兴许部分疑忌?

body:将被每个迭代调用一遍的寄托。

 

<1> 如何中途退出并行循环?

parallelOptions:一个目标,用于配置此操作的行事。

下边举个例证,当迭代到一千的时候退出循环

     
是的,在串行代码中大家break一下就化解了,不过相互就不是这样不难了,可是没什么,在竞相循环的嘱托参数中提供了二个

回去结果:

 1   class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var watch = Stopwatch.StartNew();
 6 
 7             watch.Start();
 8 
 9             ConcurrentBag<int> bag = new ConcurrentBag<int>();
10 
11             Parallel.For(0, 20000000, (i, state) =>
12                   {
13                       if (bag.Count == 1000)
14                       {
15                           state.Break();
16                           return;
17                       }
18                       bag.Add(i);
19                   });
20 
21             Console.WriteLine("当前集合有{0}个元素。", bag.Count);
22 
23         }
24     }

ParallelLoopState,该实例提供了Break和Stop方法来帮大家贯彻。

ParallelLoopResult
:包括关于已到位的轮回部分的新闻。

亚洲必赢官网 45

Break:
当然那个是通报并行计算尽快的淡出循环,比如并行总结正在迭代100,那么break后程序还会迭代全数小于十0的。

异常:

 

Stop:那么些就不1样了,比如正在迭代十0爆冷门境遇stop,那它啥也随便了,直接退出。

System.ArgumentNullException:source
参数为 null。-或- 方body 参数为 null。

<二> 并行总结中抛出尤其怎么处理?

 

System.AggregateException:包括了颇具线程上引发的整整单个分外。

 首先职分是并行计算的,处理进度中或许会发生n多的老大,那么什么样来获得到这个越发呢?普通的Exception并不可能取获得拾分,不过为互相诞生的AggregateExcepation就足以得到到1组万分。

下边举个例证,当迭代到1000的时候退出循环

对此艺术2)还包括:

class Program
{
    static void Main(string[] args)
    {
        try
        {
            Parallel.Invoke(Run1, Run2);
        }
        catch (AggregateException ex)
        {
            foreach (var single in ex.InnerExceptions)
            {
                Console.WriteLine(single.Message);
            }
        }

        Console.Read();
    }

    static void Run1()
    {
        Thread.Sleep(3000);
        throw new Exception("我是任务1抛出的异常");
    }

    static void Run2()
    {
        Thread.Sleep(5000);

        throw new Exception("我是任务2抛出的异常");
    }
}

亚洲必赢官网 46😉

System.OperationCanceledException:在
parallelOptions 设置了参数 System.Threading.CancellationToken。

亚洲必赢官网 47

 1   class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var watch = Stopwatch.StartNew();
 6 
 7             watch.Start();
 8 
 9             ConcurrentBag<int> bag = new ConcurrentBag<int>();
10 
11             Parallel.For(0, 20000000, (i, state) =>
12                   {
13                       if (bag.Count == 1000)
14                       {
15                           state.Break();
16                           return;
17                       }
18                       bag.Add(i);
19                   });
20 
21             Console.WriteLine("当前集合有{0}个元素。", bag.Count);
22 
23         }
24     }

System.ObjectDisposedException:在
parallelOptions 中与 System.Threading.CancellationToken 关联的
System.Threading.CancellationTokenSource已被假释。

 

亚洲必赢官网 48😉

对此三)包涵的不行为:

<三> 并行计算中本人可以留3个硬件线程出来啊?

亚洲必赢官网 49

System.ArgumentNullException:source
参数为 null。-或- 方body 参数为 null。

 
暗许的事态下,底层机制会尽力而为多的选用硬件线程,但是大家使用手动钦赐的功利是大家得以在二,四,几个硬件线程的意况下来实行衡量加快比。

 

System.InvalidOperationException:source
分区程序中的
System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions
属性重回 false。或 在 source 分区主次中的任何措施再次回到 null
时引发这一个。或在source 分区先后中的
System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int3贰)方法不回去正确数指标分区。

 class Program
    {
        static void Main(string[] args)
        {
            var bag = new ConcurrentBag<int>();

            ParallelOptions options = new ParallelOptions();

            //指定使用的硬件线程数为1
            options.MaxDegreeOfParallelism = 1;

            Parallel.For(0, 300000, options, i =>
            {
                bag.Add(i);
            });

            Console.WriteLine("并行计算:集合有:{0}", bag.Count);

        }
    }

<二> 并行总括中抛出相当怎么处理?

说明:

 

 首先职务是并行总计的,处理进度中恐怕会时有发生n多的要命,那么什么样来获取到这一个很是呢?普通的Exception并不能够赢获得那多少个,可是为相互诞生的AggregateExcepation就足以获取到一组尤其。

一)对于body参数中带有的ParallelLoopState实例,其作用为提前中断相互循环。

————————————————————————————————————————————————————————————

亚洲必赢官网 50😉

二)Parallel.ForEach方法不保证实施各类,它不像foreach循环那样总是各类执行。

————————————————————————————————————————————————————————————

class Program {     static void Main(string[] args)     {         try         {             Parallel.Invoke(Run1, Run2);         }         catch (AggregateException ex)         {             foreach (var single in ex.InnerExceptions)             {                 Console.WriteLine(single.Message);             }         }
        Console.Read();     }
    static void Run1()     {         Thread.Sleep(3000);         throw new Exception("我是任务1抛出的异常");     }
    static void Run2()     {         Thread.Sleep(5000);
        throw new Exception("我是任务2抛出的异常");     } }

叁)对于艺术叁)中的source,它的体系是Partitioner<TSource>。能够利用Partitioner.Create方法创制分区,该情势的多少个规整方法为:

友谊提示:假设不希罕看小说,能够移动本体系的  C#亚洲必赢官网,IL解读完整录像 【一把伞的钱哦亚洲必赢官网 51

亚洲必赢官网 52😉

l public static
OrderablePartitioner<Tuple<int, int>> Create(int
fromInclusive, int toExclusive);

————————————————————————————————————————————————————————————

亚洲必赢官网 53

l public static
OrderablePartitioner<Tuple<int, int>> Create(int
fromInclusive, int toExclusive, int rangeSize);

————————————————————————————————————————————————————————————

 

fromInclusive为限量下限(含),toExclusive为限制下限(不含),rangeSize为每一种子范围的大大小小。

<3> 并行总结中自个儿能够留三个硬件线程出来啊?

动用Partitioner创建的子范围大小私下认可大概是总计机基础的叁倍,而当使用rangeSize钦命范围大小时,那么子范围大小为钦命值。

 
暗中认可的动静下,底层机制会尽只怕多的运用硬件线程,但是大家利用手动内定的便宜是大家得以在贰,四,捌个硬件线程的图景下来进行度量加快比。

四)唯有在迭代全体形成之后才会回到结果,不然循环将直接不通。

亚洲必赢官网 54😉

 

 class Program     {         static void Main(string[] args)         {             var bag = new ConcurrentBag<int>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬件线程数为1             options.MaxDegreeOfParallelism = 1;
            Parallel.For(0, 300000, options, i =>             {                 bag.Add(i);             });
            Console.WriteLine("并行计算:集合有:{0}", bag.Count);
        }     }

4 ParallelOptions

亚洲必赢官网 55😉

定义:

 

仓库储存选项,用于配置
System.Threading.Tasks.Parallel 类的章程。

ParallelOptions属性

1)public CancellationToken
CancellationToken { get; set; }

获得或设置传播有关应注销操作的布告。

2)public int MaxDegreeOfParallelism {
get; set; }

获取或安装此 ParallelOptions
实例所允许的最大并行度。

3)public TaskScheduler TaskScheduler {
get; set; }

获得或设置与此
System.Threading.Tasks.ParallelOptions 实例关联的
System.Threading.Tasks.TaskScheduler

说明:

一)通过安装CancellationToken来撤销并行循环,当前正值周转的迭代会执行完,然后抛出System.OperationCanceledException类型的百般。

贰)TPL的办法总是会试图动用具有可用内核以达到最佳的成效,可是很大概.NET
Framework内部采用的启发式算法所获得的流入和平运动用的线程数比其实必要的多(经常都会压倒硬件线程数,那样会越来越好地扶助CPU和I/O混合型的劳作负荷)。

万般将最大并行度设置为小于等于逻辑内核数。假如设置为等于逻辑内核数,那么要保管不会潜移默化别的程序的履行。设置为小于逻辑内核数是为着有空闲内核来处理其余迫切的天职。

用途:

1)从循环外部废除并行循环

2)钦命并行度

三)钦定自定义职分调度程序

5 ParallelLoopState

定义:

可使并行循环迭代与其它迭代交互。
此类的实例由
Parallel 类提须求各样循环;不可能在用户代码中开创实例。

方法:

一)Break()方法:公告并行循环在实施完当前迭代过后尽快平息实施,可确认保障低索引步骤实现。且可有限支撑正在进行的迭代继续运营直到达成。

二)Stop()方法:文告并行循环尽快平息实施。对于尚未运维的迭代不可能会尝试进行低索引迭代。不保障全数已运营的迭代都推行完。

用途:提前退出并行循环。

说明:

一)无法而且在同2个互为循环中还要利用Break和Stop。

2)Stop比Break更常用。break语句用在相互循环中的效果和用在串行循环中不一致。Break用在相互循环中,委托的关键性方法在每回迭代的时候被调用,退出委托的基本点方法对相互循环的进行没有影响。Stop结束循环比Break快。

6 ParallelLoopResult结构

定义:

互相循环运行结果的音讯。

属性:

1)public bool IsCompleted { get;
}

假如该循环已运转实现(该循环的富有迭代均已实施,并且该循环未有接到提前甘休的乞求),则为
true;不然为 false。

2)public long? LowestBreakIteration {
get; }

回到一个意味着从中调用 Break
语句的最低迭代的整数

用途:判定当相互循环甘休时,是不是因调用了break方法或stop方法而提前退出并行循环,或具备迭代均已执行。

判定根据:

条件

 

IsCompleted

运行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop语句而提前终止

!IsCompleted &&

LowestBreakIteration!=null

使用了Break语句而提前终止

 

七 捕获并行循环中的非常

原则:

一)很是优先于从循环外部撤销和动用Break()方法或Stop()方法提前退出并行循环。

二)并行循环体抛出3个未处理的老大,并行循环就不能再起来新的迭代。

叁)暗中认可情况下当某次迭代抛出一个未处理相当,那么正在执行的迭代假设没抛出至极,正在实施的迭代会执行完。当全部迭代都实施完(有相当的大希望别的的迭代在实践的进度中也抛出非凡),并行循环将在调用它的线程中抛出十分。

相互之间循环运维的经过中,也许有多少个迭代抛出尤其,所以1般选取AggregateException来捕获万分。AggregateException继承自Exception。为了防范仅使用AggregateException未能捕获有些至极,使用AggregateException的还要还要使用Exception。

八 行使情势

 

8.1 Parallel.Invoke

 1 public static void DemonstrateInvoke()
 2 {
 3     //使用Lambda
 4     Parallel.Invoke(
 5     () =>
 6     {
 7         //具体操作1
 8     }, 
 9     () => 
10     {
11         //具体操作2
12     });
13 
14     //不使用lambda
15     Parallel.Invoke(Operation1, Operation2);
16 }
17 
18 private static void Operation1()
19 {
20     //具体操作1
21 }
22 
23 private static void Operation2()
24 {
25     //具体操作2
26 }        

 

8.2 Parallel.For

1 串行循环:
2 int toExclusive = ...;
3 for(int i =0;i<=toExclusive;i++){};
4 
5 对应的并行循环:
6 Parallel.For(0, toExclusive+1, (i) => 
7 {
8     //具体操作
9 });

8.3 Parallel.ForEach

 1 一般用法
 2 IEnumerable<string> coll = ...;
 3 Parallel.ForEach(coll,(str)=>
 4 {
 5     //具体操作
 6 });
 7 
 8 基于分区的模式
 9 优化分区数,使其最接近系统逻辑内核数:
10 子分区范围 = 对“待处理集合大小/系统逻辑内核数”取整+1。
11 int logicalCores =...;
12 IEnumerable<string> collP = ...;
13 int fromInclusive = ...;
14 int toExclusiv = ...;
15 int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores) +1;
16 Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range =>
17 {
18     for (int i = range.Item1; i < range.Item2; i++)
19     {
20         //使用集合:collection[i]
21     }
22 });    

八.四 从循环外部撤废并行循环

留神:不行使IsCancellationRequested或ThrowIfCancellationRequested()的意况下不可能捕获类型为AggregateException的老大。

1)对于Parallel.For

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     CancellationTokenSource cts = new CancellationTokenSource();
 4     //其他操作...
 5 
 6     //异步执行Operation方法
 7     Task.Factory.StartNew(()=>{Operation(cts);});
 8     //异步执行condition的计算过程
 9     Task.Factory.StartNew(()=>{
10         bool condition = ...;
11         if (condition) cts.Cancel();
12     }
13 
14     //其他操作...
15 }
16 
17 private static void Operation(CancellationTokenSource cts)
18 {
19     CancellationToken ct = cts.Token;
20     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
21     int toExclusive = ...;
22     Parallel.For(0, toExclusive, op, (i) =>
23     {
24 
25         //其他操作...
26 
27         //return只对当前子线程有效
28         if (ct.IsCancellationRequested)
29         { return; }
30 
31         //其他操作...
32     });
33 }                    

使用ThrowIfCancellationRequested()方法抛出非凡

将上边的竞相循环部分替换为下边包车型客车代码:

 

1 Parallel.For(0, toExclusive, op, (i) =>
2 {
3 
4     //其他操作...
5 
6     ct.ThrowIfCancellationRequested();
7 
8     //其他操作...
9 });

 

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

2)对于Parallel.ForEach

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     //同1)中CancelFromExternal方法
 4 }
 5 
 6 private static void Operation(CancellationTokenSource cts)
 7 {
 8     CancellationToken ct = cts.Token;
 9     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
10     IEnumerable<string> coll = new List<string> { "str1", "str2" };
11     Parallel.ForEach(coll, op,(str, loopState) =>
12     {
13         //其他操作...
14 
15       //return只对当前子线程有效
16       if (ct.IsCancellationRequested)
17       { return; }
18 
19       //其他操作...
20   });
21 }

使用ThrowIfCancellationRequested()方法抛出相当

将Operation方法中的:

if (ct.IsCancellationRequested)

   { return; }

替换为:

ct.ThrowIfCancellationRequested();

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

捌.伍 钦点并行度

1 int maxDegreeOfParallelism = Environment.ProcessorCount - 1;
2 ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism =     maxDegreeOfParallelism };
3 IEnumerable<string> coll = new List<string> { };
4 Parallel.ForEach(coll, op ,(str) =>
5 {
6     //具体操作
7 });

8.6 提早退出并行循环

1)对于Parallel.For

 1 int toExclusive = 10;
 2 Parallel.For(0, toExclusive, (i, loopState) =>
 3 {
 4     //其他操作...
 5     //计算condition
 6     bool condition = ...;
 7     if (condition)
 8     {
 9         loopState.Break();//或使用loopState.Stop
10         return;
11     }
12 
13     //其他操作
14 });    

2)对于Parallel.ForEach

 1 IEnumerable<string> coll = new List<string> {"str1","str2" };
 2 Parallel.ForEach(coll, (str, loopState) =>
 3 {
 4     //其他操作...
 5 
 6     //计算condition
 7     bool condition = ...;
 8     if (condition)
 9     {
10         loopState.Break();//或使用loopState.Stop
11         return;
12     }
13 
14     //其他操作
15 
16 });

九 分外处理情势

主干格局

在保管使用AggregateException
能够捕捉到全数的不胜时,可以省去catch(Exception
e)的部分。

 

 1 try
 2 {
 3     //Do something
 4 }
 5 catch(AggregateException e)
 6 {
 7     Foreach(Exception ex in e.InnerExceptions)
 8     {
 9         //Do something
10     }
11 }
12 catch(Exception e)
13 {
14     //Do something
15 }

 

为上述并行循环使用方式加上格外处理机制

①种办法是把相互循环放入try块中,另1种艺术是在历次迭代的经过中抓获万分。

 

 —————————————————————————————–

转发与引用请注脚出处。

时刻匆忙,水平有限,如有不当之处,欢迎指正。

网站地图xml地图