Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetr Onderka <gsvick@gmail.com>2012-08-19 19:41:22 +0400
committerPetr Onderka <gsvick@gmail.com>2012-08-20 02:08:33 +0400
commit96b8ce459b958d989e4e92dd34373f86a03dff8f (patch)
tree40e8f36b3456849422c22164682ebe10974d89de /mcs/class/System.Threading.Tasks.Dataflow
parent890de4e2e08549da82b3b45060ba9df3ae353925 (diff)
Correctly implemented Choose()
Diffstat (limited to 'mcs/class/System.Threading.Tasks.Dataflow')
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj1
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs135
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs6
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources1
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs215
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs43
6 files changed, 307 insertions, 94 deletions
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
index ad1056315e1..078375cf7f3 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
@@ -47,6 +47,7 @@
<Compile Include="Test\Blocks.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
<Compile Include="Test\AssertEx.cs" />
+ <Compile Include="Test\System.Threading.Tasks.Dataflow\ChooseTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\EncapsulateTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\InvalidArgumentsTest.cs" />
<Compile Include="Test\System.Threading.Tasks.Dataflow\OutputAvailableTest.cs" />
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
index 58488b5b894..36f2eaedce9 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
@@ -1,6 +1,7 @@
// JoinBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@@ -19,66 +20,110 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
+namespace System.Threading.Tasks.Dataflow {
+ class ChooserBlock<T1, T2, T3> {
+ class ChooseTarget<TMessage> : ITargetBlock<TMessage> {
+ readonly ChooserBlock<T1, T2, T3> chooserBlock;
+ readonly int index;
+ readonly Action<TMessage> action;
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- internal class ChooserBlock<T1, T2, T3>
- {
- class ChooseTarget<TMessage> : ITargetBlock<TMessage>
- {
- Action<TMessage> messageArrived;
-
- public ChooseTarget (Action<TMessage> messageArrived)
+ public ChooseTarget (ChooserBlock<T1, T2, T3> chooserBlock,
+ int index, Action<TMessage> action)
{
- this.messageArrived = messageArrived;
+ this.chooserBlock = chooserBlock;
+ this.index = index;
+ this.action = action;
}
- public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
- TMessage messageValue,
- ISourceBlock<TMessage> source,
- bool consumeToAccept)
+ public DataflowMessageStatus OfferMessage (
+ DataflowMessageHeader messageHeader, TMessage messageValue,
+ ISourceBlock<TMessage> source, bool consumeToAccept)
{
- messageArrived (messageValue);
+ if (!chooserBlock.canAccept)
+ return DataflowMessageStatus.DecliningPermanently;
+
+ bool lockTaken = false;
+ try {
+ chooserBlock.messageLock.Enter (ref lockTaken);
+ if (!chooserBlock.canAccept)
+ return DataflowMessageStatus.DecliningPermanently;
+
+ if (consumeToAccept) {
+ bool consummed;
+ messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
+ if (!consummed)
+ return DataflowMessageStatus.NotAvailable;
+ }
+
+ chooserBlock.canAccept = false;
+ } finally {
+ if (lockTaken)
+ chooserBlock.messageLock.Exit ();
+ }
+
+ chooserBlock.MessageArrived (index, action, messageValue);
return DataflowMessageStatus.Accepted;
}
public Task Completion {
- get {
- return null;
- }
+ get { return null; }
}
public void Complete ()
{
-
}
public void Fault (Exception exception)
{
-
}
}
- TaskCompletionSource<int> completion = new TaskCompletionSource<int> ();
+ readonly TaskCompletionSource<int> completion = new TaskCompletionSource<int> ();
+
+ SpinLock messageLock;
+ bool canAccept = true;
+
+ public ChooserBlock (
+ Action<T1> action1, Action<T2> action2, Action<T3> action3,
+ DataflowBlockOptions dataflowBlockOptions)
+ {
+ Target1 = new ChooseTarget<T1> (this, 0, action1);
+ Target2 = new ChooseTarget<T2> (this, 1, action2);
+ if (action3 != null)
+ Target3 = new ChooseTarget<T3> (this, 2, action3);
+
+ if (dataflowBlockOptions.CancellationToken != CancellationToken.None)
+ dataflowBlockOptions.CancellationToken.Register (Cancelled);
+ }
- public ChooserBlock (Action<T1> action1, Action<T2> action2, Action<T3> action3, DataflowBlockOptions dataflowBlockOptions)
+ void Cancelled ()
{
- // TODO: take care of options and its cancellation token
+ if (!canAccept)
+ return;
+
+ bool lockTaken = false;
+ try {
+ messageLock.Enter (ref lockTaken);
+ if (!canAccept)
+ return;
+
+ completion.SetCanceled ();
- Target1 = new ChooseTarget<T1> (message => MessageArrived (0, action1, message));
- Target2 = new ChooseTarget<T2> (message => MessageArrived (1, action2, message));
- Target3 = new ChooseTarget<T3> (message => MessageArrived (2, action3, message));
+ canAccept = false;
+ } finally {
+ if (lockTaken)
+ messageLock.Exit ();
+ }
}
- void MessageArrived<TMessage> (int index, Action<TMessage> action, TMessage value)
+ public void AllSourcesCompleted ()
+ {
+ Cancelled ();
+ }
+
+ void MessageArrived<TMessage> (
+ int index, Action<TMessage> action, TMessage value)
{
try {
action (value);
@@ -88,26 +133,14 @@ namespace System.Threading.Tasks.Dataflow
}
}
- public ITargetBlock<T1> Target1 {
- get;
- private set;
- }
+ public ITargetBlock<T1> Target1 { get; private set; }
- public ITargetBlock<T2> Target2 {
- get;
- private set;
- }
+ public ITargetBlock<T2> Target2 { get; private set; }
- public ITargetBlock<T3> Target3 {
- get;
- private set;
- }
+ public ITargetBlock<T3> Target3 { get; private set; }
public Task<int> Completion {
- get {
- return completion.Task;
- }
+ get { return completion.Task; }
}
}
-}
-
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
index 49c768b7d0e..9b465f801cb 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
@@ -67,6 +67,9 @@ namespace System.Threading.Tasks.Dataflow {
source1.LinkTo (chooser.Target1);
source2.LinkTo (chooser.Target2);
+ Task.WhenAll (source1.Completion, source2.Completion)
+ .ContinueWith (_ => chooser.AllSourcesCompleted ());
+
return chooser.Completion;
}
@@ -105,6 +108,9 @@ namespace System.Threading.Tasks.Dataflow {
source2.LinkTo (chooser.Target2);
source3.LinkTo (chooser.Target3);
+ Task.WhenAll (source1.Completion, source2.Completion, source3.Completion)
+ .ContinueWith (_ => chooser.AllSourcesCompleted ());
+
return chooser.Completion;
}
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources
index 0deb212393f..a60d96bac31 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources
@@ -25,5 +25,6 @@ System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs
System.Threading.Tasks.Dataflow/InvalidArgumentsTest.cs
System.Threading.Tasks.Dataflow/OutputAvailableTest.cs
System.Threading.Tasks.Dataflow/EncapsulateTest.cs
+System.Threading.Tasks.Dataflow/ChooseTest.cs
../System.Threading.Tasks.Dataflow/CompletionHelper.cs
../../corlib/System.Threading/AtomicBoolean.cs
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs
new file mode 100644
index 00000000000..6d137a05e89
--- /dev/null
+++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ChooseTest.cs
@@ -0,0 +1,215 @@
+// DataflowBlockTest.cs
+//
+// Author:
+// Petr Onderka <gsvick@gmail.com>
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow {
+ [TestFixture]
+ public class ChooseTest {
+ [Test]
+ public void BasicTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+
+ bool action1 = false;
+ bool action2 = false;
+ var completion = DataflowBlock.Choose (
+ source1, _ => action1 = true,
+ source2, _ => action2 = true);
+
+ source1.Post (42);
+
+ Assert.IsTrue (completion.Wait (500));
+ Assert.AreEqual (0, completion.Result);
+ Assert.IsTrue (action1);
+ Assert.IsFalse (action2);
+ }
+
+ [Test]
+ public void OnlyOneConsumedTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+
+ int action1 = 0;
+ int action2 = 0;
+ var completion = DataflowBlock.Choose (
+ source1, _ => action1++,
+ source2, _ => action2++);
+
+ source1.Post (42);
+ source1.Post (43);
+
+ Assert.IsTrue (completion.Wait (500));
+ Assert.AreEqual (0, completion.Result);
+ Assert.AreEqual (1, action1);
+ Assert.AreEqual (0, action2);
+
+ int item;
+ Assert.IsTrue (source1.TryReceive (out item));
+ Assert.AreEqual (43, item);
+ }
+
+ [Test]
+ public void RaceTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<int> ();
+
+ int action1 = 0;
+ int action2 = 0;
+ var completion = DataflowBlock.Choose (
+ source1, _ => action1++,
+ source2, _ => action2++);
+
+ var barrier = new Barrier (2);
+
+ Task.Run (() =>
+ {
+ barrier.SignalAndWait ();
+ source1.Post (10);
+ });
+ Task.Run (() =>
+ {
+ barrier.SignalAndWait ();
+ source2.Post (20);
+ });
+
+ Assert.IsTrue (completion.Wait (500));
+ Assert.AreEqual (1, action1 + action2);
+
+ int item;
+ Assert.IsTrue (source1.TryReceive (out item) || source2.TryReceive (out item));
+ }
+
+ [Test]
+ public void BlockCompletionTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+
+ var completion = DataflowBlock.Choose (
+ source1, _ => { }, source2, _ => { });
+
+ Assert.IsFalse (completion.IsCanceled);
+
+ ((IDataflowBlock)source1).Fault (new Exception ());
+ source2.Complete ();
+
+ Thread.Sleep (100);
+
+ Assert.IsTrue (completion.IsCanceled);
+ }
+
+ [Test]
+ public void CancellationTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+
+ var tokenSource = new CancellationTokenSource ();
+ var options = new DataflowBlockOptions
+ { CancellationToken = tokenSource.Token };
+
+ var completion = DataflowBlock.Choose (
+ source1, _ => { }, source2, _ => { }, options);
+
+ Assert.IsFalse (completion.IsCanceled);
+
+ tokenSource.Cancel ();
+
+ Thread.Sleep (100);
+
+ Assert.IsTrue (completion.IsCanceled);
+ }
+
+ [Test]
+ public void ConsumeToAcceptTest ()
+ {
+ var source1 = new BroadcastBlock<int> (_ => 42);
+ var source2 = new BufferBlock<int> ();
+
+ int action1 = 0;
+ int action2 = 0;
+ var completion = DataflowBlock.Choose (
+ source1, i => action1 = i,
+ source2, i => action2 = i);
+
+ source1.Post (10);
+
+ Assert.IsTrue (completion.Wait (500));
+ Assert.AreEqual (0, completion.Result);
+ Assert.AreEqual (42, action1);
+ Assert.AreEqual (0, action2);
+ }
+
+ [Test]
+ public void ExceptionTest ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+
+ var exception = new Exception ();
+ var completion = DataflowBlock.Choose (
+ source1, _ => { throw exception; },
+ source2, _ => { });
+
+ source1.Post (42);
+
+ var ae = AssertEx.Throws<AggregateException> (() => completion.Wait (500));
+ Assert.AreEqual (1, ae.InnerExceptions.Count);
+ Assert.AreSame (exception, ae.InnerException);
+ }
+
+ [Test]
+ public void BasicTest_3 ()
+ {
+ var source1 = new BufferBlock<int> ();
+ var source2 = new BufferBlock<long> ();
+ var source3 = new BufferBlock<object> ();
+
+ bool action1 = false;
+ bool action2 = false;
+ bool action3 = false;
+ var completion = DataflowBlock.Choose (
+ source1, _ => action1 = true,
+ source2, _ => action2 = true,
+ source3, _ => action3 = true);
+
+ source3.Post (new object ());
+
+ Assert.IsTrue (completion.Wait (500));
+ Assert.AreEqual (2, completion.Result);
+ Assert.IsFalse (action1);
+ Assert.IsFalse (action2);
+ Assert.IsTrue (action3);
+ }
+ }
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
index 550d548b482..4ebff304fc9 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
@@ -35,49 +35,6 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
[TestFixture]
public class DataflowBlockTest {
[Test]
- public void ChooseTest ()
- {
- var source1 = new BufferBlock<int> ();
- var source2 = new BufferBlock<long> ();
-
- bool action1 = false;
- bool action2 = false;
- var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true);
-
- source1.Post (42);
-
- Thread.Sleep (1600);
- Assert.IsTrue (action1);
- Assert.IsFalse (action2);
- Assert.IsTrue (completion.IsCompleted);
- Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status);
- Assert.AreEqual (0, completion.Result);
- }
-
- [Test]
- public void ChooseTest_3 ()
- {
- var source1 = new BufferBlock<int> ();
- var source2 = new BufferBlock<long> ();
- var source3 = new BufferBlock<object> ();
-
- bool action1 = false;
- bool action2 = false;
- bool action3 = false;
- var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true, source3, (_) => action3 = true);
-
- source3.Post (new object ());
-
- Thread.Sleep (1600);
- Assert.IsFalse (action1);
- Assert.IsFalse (action2);
- Assert.IsTrue (action3);
- Assert.IsTrue (completion.IsCompleted);
- Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status);
- Assert.AreEqual (2, completion.Result);
- }
-
- [Test]
public void TryReceiveTest ()
{
var block = new BufferBlock<int> ();