diff options
author | Petr Onderka <gsvick@gmail.com> | 2012-08-19 19:41:22 +0400 |
---|---|---|
committer | Petr Onderka <gsvick@gmail.com> | 2012-08-20 02:08:33 +0400 |
commit | 96b8ce459b958d989e4e92dd34373f86a03dff8f (patch) | |
tree | 40e8f36b3456849422c22164682ebe10974d89de /mcs/class/System.Threading.Tasks.Dataflow | |
parent | 890de4e2e08549da82b3b45060ba9df3ae353925 (diff) |
Correctly implemented Choose()
Diffstat (limited to 'mcs/class/System.Threading.Tasks.Dataflow')
6 files changed, 307 insertions, 94 deletions
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj index ad1056315e1..078375cf7f3 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj @@ -47,6 +47,7 @@ <Compile Include="Test\Blocks.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
<Compile Include="Test\AssertEx.cs" />
+ <Compile Include="Test\System.Threading.Tasks.Dataflow\ChooseTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\EncapsulateTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\InvalidArgumentsTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\OutputAvailableTest.cs" />
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs index 58488b5b894..36f2eaedce9 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs @@ -1,6 +1,7 @@ // JoinBlock.cs // // Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -19,66 +20,110 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// -// +namespace System.Threading.Tasks.Dataflow { + class ChooserBlock<T1, T2, T3> { + class ChooseTarget<TMessage> : ITargetBlock<TMessage> { + readonly ChooserBlock<T1, T2, T3> chooserBlock; + readonly int index; + readonly Action<TMessage> action; -using System; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Collections.Concurrent; - -namespace System.Threading.Tasks.Dataflow -{ - internal class ChooserBlock<T1, T2, T3> - { - class ChooseTarget<TMessage> : ITargetBlock<TMessage> - { - Action<TMessage> messageArrived; - - public ChooseTarget (Action<TMessage> messageArrived) + public ChooseTarget (ChooserBlock<T1, T2, T3> chooserBlock, + int index, Action<TMessage> action) { - this.messageArrived = messageArrived; + this.chooserBlock = chooserBlock; + this.index = index; + this.action = action; } - public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader, - TMessage messageValue, - ISourceBlock<TMessage> source, - bool consumeToAccept) + public DataflowMessageStatus OfferMessage ( + DataflowMessageHeader messageHeader, TMessage messageValue, + ISourceBlock<TMessage> source, bool consumeToAccept) { - messageArrived (messageValue); + if (!chooserBlock.canAccept) + return DataflowMessageStatus.DecliningPermanently; + + bool lockTaken = false; + try { + chooserBlock.messageLock.Enter (ref lockTaken); + if (!chooserBlock.canAccept) + return DataflowMessageStatus.DecliningPermanently; + + if (consumeToAccept) { + bool consummed; + messageValue = source.ConsumeMessage (messageHeader, this, out consummed); + if (!consummed) + return DataflowMessageStatus.NotAvailable; + } + + chooserBlock.canAccept = false; + } finally { + if (lockTaken) + chooserBlock.messageLock.Exit (); + } + + chooserBlock.MessageArrived (index, action, messageValue); return DataflowMessageStatus.Accepted; } public Task Completion { - get { - return null; - } + get { return null; } } public void Complete () { - } public void Fault (Exception exception) { - } } - TaskCompletionSource<int> completion = new TaskCompletionSource<int> (); + readonly TaskCompletionSource<int> completion = new TaskCompletionSource<int> (); + + SpinLock messageLock; + bool canAccept = true; + + public ChooserBlock ( + Action<T1> action1, Action<T2> action2, Action<T3> action3, + DataflowBlockOptions dataflowBlockOptions) + { + Target1 = new ChooseTarget<T1> (this, 0, action1); + Target2 = new ChooseTarget<T2> (this, 1, action2); + if (action3 != null) + Target3 = new ChooseTarget<T3> (this, 2, action3); + + if (dataflowBlockOptions.CancellationToken != CancellationToken.None) + dataflowBlockOptions.CancellationToken.Register (Cancelled); + } - public ChooserBlock (Action<T1> action1, Action<T2> action2, Action<T3> action3, DataflowBlockOptions dataflowBlockOptions) + void Cancelled () { - // TODO: take care of options and its cancellation token + if (!canAccept) + return; + + bool lockTaken = false; + try { + messageLock.Enter (ref lockTaken); + if (!canAccept) + return; + + completion.SetCanceled (); - Target1 = new ChooseTarget<T1> (message => MessageArrived (0, action1, message)); - Target2 = new ChooseTarget<T2> (message => MessageArrived (1, action2, message)); - Target3 = new ChooseTarget<T3> (message => MessageArrived (2, action3, message)); + canAccept = false; + } finally { + if (lockTaken) + messageLock.Exit (); + } } - void MessageArrived<TMessage> (int index, Action<TMessage> action, TMessage value) + public void AllSourcesCompleted () + { + Cancelled (); + } + + void MessageArrived<TMessage> ( + int index, Action<TMessage> action, TMessage value) { try { action (value); @@ -88,26 +133,14 @@ namespace System.Threading.Tasks.Dataflow } } - public ITargetBlock<T1> Target1 { - get; - private set; - } + public ITargetBlock<T1> Target1 { get; private set; } - public ITargetBlock<T2> Target2 { - get; - private set; - } + public ITargetBlock<T2> Target2 { get; private set; } - public ITargetBlock<T3> Target3 { - get; - private set; - } + public ITargetBlock<T3> Target3 { get; private set; } public Task<int> Completion { - get { - return completion.Task; - } + get { return completion.Task; } } } -} - +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs index 49c768b7d0e..9b465f801cb 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs @@ -67,6 +67,9 @@ namespace System.Threading.Tasks.Dataflow { source1.LinkTo (chooser.Target1); source2.LinkTo (chooser.Target2); + Task.WhenAll (source1.Completion, source2.Completion) + .ContinueWith (_ => chooser.AllSourcesCompleted ()); + return chooser.Completion; } @@ -105,6 +108,9 @@ namespace System.Threading.Tasks.Dataflow { source2.LinkTo (chooser.Target2); source3.LinkTo (chooser.Target3); + Task.WhenAll (source1.Completion, source2.Completion, source3.Completion) + .ContinueWith (_ => chooser.AllSourcesCompleted ()); + return chooser.Completion; } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources index 0deb212393f..a60d96bac31 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources @@ -25,5 +25,6 @@ System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs System.Threading.Tasks.Dataflow/InvalidArgumentsTest.cs System.Threading.Tasks.Dataflow/OutputAvailableTest.cs System.Threading.Tasks.Dataflow/EncapsulateTest.cs +System.Threading.Tasks.Dataflow/ChooseTest.cs ../System.Threading.Tasks.Dataflow/CompletionHelper.cs ../../corlib/System.Threading/AtomicBoolean.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs new file mode 100644 index 00000000000..6d137a05e89 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs @@ -0,0 +1,215 @@ +// DataflowBlockTest.cs +// +// Author: +// Petr Onderka <gsvick@gmail.com> +// +// Copyright (c) 2012 Petr Onderka +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using NUnit.Framework; + +namespace MonoTests.System.Threading.Tasks.Dataflow { + [TestFixture] + public class ChooseTest { + [Test] + public void BasicTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + + bool action1 = false; + bool action2 = false; + var completion = DataflowBlock.Choose ( + source1, _ => action1 = true, + source2, _ => action2 = true); + + source1.Post (42); + + Assert.IsTrue (completion.Wait (500)); + Assert.AreEqual (0, completion.Result); + Assert.IsTrue (action1); + Assert.IsFalse (action2); + } + + [Test] + public void OnlyOneConsumedTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + + int action1 = 0; + int action2 = 0; + var completion = DataflowBlock.Choose ( + source1, _ => action1++, + source2, _ => action2++); + + source1.Post (42); + source1.Post (43); + + Assert.IsTrue (completion.Wait (500)); + Assert.AreEqual (0, completion.Result); + Assert.AreEqual (1, action1); + Assert.AreEqual (0, action2); + + int item; + Assert.IsTrue (source1.TryReceive (out item)); + Assert.AreEqual (43, item); + } + + [Test] + public void RaceTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<int> (); + + int action1 = 0; + int action2 = 0; + var completion = DataflowBlock.Choose ( + source1, _ => action1++, + source2, _ => action2++); + + var barrier = new Barrier (2); + + Task.Run (() => + { + barrier.SignalAndWait (); + source1.Post (10); + }); + Task.Run (() => + { + barrier.SignalAndWait (); + source2.Post (20); + }); + + Assert.IsTrue (completion.Wait (500)); + Assert.AreEqual (1, action1 + action2); + + int item; + Assert.IsTrue (source1.TryReceive (out item) || source2.TryReceive (out item)); + } + + [Test] + public void BlockCompletionTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + + var completion = DataflowBlock.Choose ( + source1, _ => { }, source2, _ => { }); + + Assert.IsFalse (completion.IsCanceled); + + ((IDataflowBlock)source1).Fault (new Exception ()); + source2.Complete (); + + Thread.Sleep (100); + + Assert.IsTrue (completion.IsCanceled); + } + + [Test] + public void CancellationTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + + var tokenSource = new CancellationTokenSource (); + var options = new DataflowBlockOptions + { CancellationToken = tokenSource.Token }; + + var completion = DataflowBlock.Choose ( + source1, _ => { }, source2, _ => { }, options); + + Assert.IsFalse (completion.IsCanceled); + + tokenSource.Cancel (); + + Thread.Sleep (100); + + Assert.IsTrue (completion.IsCanceled); + } + + [Test] + public void ConsumeToAcceptTest () + { + var source1 = new BroadcastBlock<int> (_ => 42); + var source2 = new BufferBlock<int> (); + + int action1 = 0; + int action2 = 0; + var completion = DataflowBlock.Choose ( + source1, i => action1 = i, + source2, i => action2 = i); + + source1.Post (10); + + Assert.IsTrue (completion.Wait (500)); + Assert.AreEqual (0, completion.Result); + Assert.AreEqual (42, action1); + Assert.AreEqual (0, action2); + } + + [Test] + public void ExceptionTest () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + + var exception = new Exception (); + var completion = DataflowBlock.Choose ( + source1, _ => { throw exception; }, + source2, _ => { }); + + source1.Post (42); + + var ae = AssertEx.Throws<AggregateException> (() => completion.Wait (500)); + Assert.AreEqual (1, ae.InnerExceptions.Count); + Assert.AreSame (exception, ae.InnerException); + } + + [Test] + public void BasicTest_3 () + { + var source1 = new BufferBlock<int> (); + var source2 = new BufferBlock<long> (); + var source3 = new BufferBlock<object> (); + + bool action1 = false; + bool action2 = false; + bool action3 = false; + var completion = DataflowBlock.Choose ( + source1, _ => action1 = true, + source2, _ => action2 = true, + source3, _ => action3 = true); + + source3.Post (new object ()); + + Assert.IsTrue (completion.Wait (500)); + Assert.AreEqual (2, completion.Result); + Assert.IsFalse (action1); + Assert.IsFalse (action2); + Assert.IsTrue (action3); + } + } +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs index 550d548b482..4ebff304fc9 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs @@ -35,49 +35,6 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { [TestFixture] public class DataflowBlockTest { [Test] - public void ChooseTest () - { - var source1 = new BufferBlock<int> (); - var source2 = new BufferBlock<long> (); - - bool action1 = false; - bool action2 = false; - var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true); - - source1.Post (42); - - Thread.Sleep (1600); - Assert.IsTrue (action1); - Assert.IsFalse (action2); - Assert.IsTrue (completion.IsCompleted); - Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status); - Assert.AreEqual (0, completion.Result); - } - - [Test] - public void ChooseTest_3 () - { - var source1 = new BufferBlock<int> (); - var source2 = new BufferBlock<long> (); - var source3 = new BufferBlock<object> (); - - bool action1 = false; - bool action2 = false; - bool action3 = false; - var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true, source3, (_) => action3 = true); - - source3.Post (new object ()); - - Thread.Sleep (1600); - Assert.IsFalse (action1); - Assert.IsFalse (action2); - Assert.IsTrue (action3); - Assert.IsTrue (completion.IsCompleted); - Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status); - Assert.AreEqual (2, completion.Result); - } - - [Test] public void TryReceiveTest () { var block = new BufferBlock<int> (); |