Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPetr Onderka <gsvick@gmail.com>2012-08-19 22:34:18 +0400
committerPetr Onderka <gsvick@gmail.com>2012-08-20 02:09:05 +0400
commit2c25c41e032eda21b18b09455b8df1077c4d1075 (patch)
treef41d1ffe783b6b8798954b41f2e2642cbfa8be8d /mcs/class/System.Threading.Tasks.Dataflow
parent06bcfa044fd178e00f397cafac82b6c673b3d667 (diff)
Finished adding documentation for non-public types and methods
Diffstat (limited to 'mcs/class/System.Threading.Tasks.Dataflow')
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs2
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs2
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs33
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs3
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs3
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs3
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs19
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs16
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs42
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs53
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs34
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs41
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs41
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs23
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs9
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs46
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs13
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs14
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs3
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs20
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs18
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs138
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs24
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs27
-rw-r--r--mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs3
25 files changed, 479 insertions, 151 deletions
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
index 20a3a23cf59..312e5f28038 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
@@ -241,7 +241,7 @@ namespace System.Threading.Tasks.Dataflow {
}
/// <summary>
- /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// Creates a batch of the given size and adds the result to the output queue.
/// </summary>
void MakeBatch (int size)
{
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
index e877c36134a..b2e2884502b 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
@@ -84,7 +84,7 @@ namespace System.Threading.Tasks.Dataflow {
/// <summary>
/// Takes an item from the queue and sets it as <see cref="CurrentItem"/>.
/// </summary>
- public void DequeueItem()
+ public void DequeueItem ()
{
T item;
if (Outgoing.TryTake (out item)) {
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
index 36f2eaedce9..bf2d0b03dd6 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
@@ -22,7 +22,13 @@
// THE SOFTWARE.
namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Block used in all versions of <see cref="DataflowBlock.Choose"/>.
+ /// </summary>
class ChooserBlock<T1, T2, T3> {
+ /// <summary>
+ /// Target for one of the sources to choose from.
+ /// </summary>
class ChooseTarget<TMessage> : ITargetBlock<TMessage> {
readonly ChooserBlock<T1, T2, T3> chooserBlock;
readonly int index;
@@ -97,6 +103,11 @@ namespace System.Threading.Tasks.Dataflow {
dataflowBlockOptions.CancellationToken.Register (Cancelled);
}
+ /// <summary>
+ /// Causes cancellation of <see cref="Completion"/>.
+ /// If a message is already being consumed (and the consumsing succeeds)
+ /// or if its action is being invoked, the Task is not cancelled.
+ /// </summary>
void Cancelled ()
{
if (!canAccept)
@@ -117,11 +128,19 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Called when all sources have completed,
+ /// causes cancellation of <see cref="Completion"/>.
+ /// </summary>
public void AllSourcesCompleted ()
{
Cancelled ();
}
+ /// <summary>
+ /// Called when message has arrived (and was consumed, if necessary).
+ /// This method can be called only once in the lifetime of this object.
+ /// </summary>
void MessageArrived<TMessage> (
int index, Action<TMessage> action, TMessage value)
{
@@ -133,12 +152,26 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Target block for the first source block.
+ /// </summary>
public ITargetBlock<T1> Target1 { get; private set; }
+ /// <summary>
+ /// Target block for the second source block.
+ /// </summary>
public ITargetBlock<T2> Target2 { get; private set; }
+ /// <summary>
+ /// Target block for the third source block.
+ /// Is <c>null</c> if there are only two actions.
+ /// </summary>
public ITargetBlock<T3> Target3 { get; private set; }
+ /// <summary>
+ /// Task that signifies that an item was accepted and
+ /// its action has been called.
+ /// </summary>
public Task<int> Completion {
get { return completion.Task; }
}
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs
index c24f718345a..608e22f1425 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs
@@ -26,6 +26,9 @@ namespace System.Threading.Tasks.Dataflow {
static readonly DataflowBlockOptions DefaultOptions =
new DataflowBlockOptions ();
+ /// <summary>
+ /// Cached default block options
+ /// </summary>
internal static DataflowBlockOptions Default {
get { return DefaultOptions; }
}
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs
index 7506b3fd22e..2b5658208c7 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs
@@ -28,6 +28,9 @@ namespace System.Threading.Tasks.Dataflow {
int maxDegreeOfParallelism;
+ /// <summary>
+ /// Cached default block options
+ /// </summary>
internal static new ExecutionDataflowBlockOptions Default {
get { return DefaultOptions; }
}
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs
index 99cfa23638f..f7a24d4b6fe 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs
@@ -29,6 +29,9 @@ namespace System.Threading.Tasks.Dataflow
long maxNumberOfGroups;
+ /// <summary>
+ /// Cached default block options
+ /// </summary>
internal static new GroupingDataflowBlockOptions Default {
get { return DefaultOptions; }
}
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs
index 3ae427ca8c0..a6e69b5630e 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs
@@ -19,23 +19,12 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
-
-namespace System.Threading.Tasks.Dataflow
-{
- public interface IDataflowBlock
- {
- Task Completion {
- get;
- }
+namespace System.Threading.Tasks.Dataflow {
+ public interface IDataflowBlock {
+ Task Completion { get; }
void Complete ();
void Fault (Exception exception);
}
-}
-
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs
index 2bb39a21b51..dc35f58cfbb 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs
@@ -19,20 +19,12 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
using System.Collections.Generic;
-namespace System.Threading.Tasks.Dataflow
-{
- public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput>, IDataflowBlock
- {
+namespace System.Threading.Tasks.Dataflow {
+ public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput> {
bool TryReceive (Predicate<TOutput> filter, out TOutput item);
bool TryReceiveAll (out IList<TOutput> items);
- }
-}
-
+ }
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
index 1117926dcd9..fb424fc0e8b 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
@@ -52,10 +52,10 @@ namespace System.Threading.Tasks.Dataflow
this.dataflowBlockOptions = dataflowBlockOptions;
compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
- target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
+ target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd1);
- target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
+ target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd2);
outgoing = new OutgoingQueue<Tuple<T1, T2>> (this, compHelper,
@@ -114,11 +114,13 @@ namespace System.Threading.Tasks.Dataflow
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the first target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd1 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
@@ -126,6 +128,10 @@ namespace System.Threading.Tasks.Dataflow
<= dataflowBlockOptions.MaxNumberOfGroups;
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the second target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd2 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
@@ -133,7 +139,10 @@ namespace System.Threading.Tasks.Dataflow
<= dataflowBlockOptions.MaxNumberOfGroups;
}
- void SignalArrivalTargetImpl()
+ /// <summary>
+ /// Decides whether to create a new tuple or not.
+ /// </summary>
+ void SignalArrivalTarget ()
{
if (dataflowBlockOptions.Greedy) {
bool taken = false;
@@ -160,6 +169,9 @@ namespace System.Threading.Tasks.Dataflow
}
}
+ /// <summary>
+ /// Returns whether non-greedy creation of a tuple should be started.
+ /// </summary>
bool ShouldProcessNonGreedy ()
{
return target1.PostponedMessagesCount >= 1
@@ -168,6 +180,9 @@ namespace System.Threading.Tasks.Dataflow
|| outgoing.Count < dataflowBlockOptions.BoundedCapacity);
}
+ /// <summary>
+ /// Starts non-greedy creation of tuples, if one doesn't already run.
+ /// </summary>
void EnsureNonGreedyProcessing ()
{
if (nonGreedyProcessing.TrySet ())
@@ -177,6 +192,10 @@ namespace System.Threading.Tasks.Dataflow
dataflowBlockOptions.TaskScheduler);
}
+ /// <summary>
+ /// Creates tuples in non-greedy mode,
+ /// making sure the whole tuple is available by using reservations.
+ /// </summary>
void NonGreedyProcess()
{
while (ShouldProcessNonGreedy ()) {
@@ -204,6 +223,9 @@ namespace System.Threading.Tasks.Dataflow
}
+ /// <summary>
+ /// Creates a tuple from the given values and adds the result to the output queue.
+ /// </summary>
void TriggerMessage (T1 val1, T2 val2)
{
outgoing.AddData (Tuple.Create (val1, val2));
@@ -215,15 +237,11 @@ namespace System.Threading.Tasks.Dataflow
}
public ITargetBlock<T1> Target1 {
- get {
- return target1;
- }
+ get { return target1; }
}
public ITargetBlock<T2> Target2 {
- get {
- return target2;
- }
+ get { return target2; }
}
public int OutputCount {
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
index 6b0800ce394..419beb75c60 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
@@ -55,13 +55,13 @@ namespace System.Threading.Tasks.Dataflow
this.dataflowBlockOptions = dataflowBlockOptions;
this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
- target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
+ target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd1);
- target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
+ target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd2);
- target3 = new JoinTarget<T3> (this, SignalArrivalTargetImpl, compHelper,
+ target3 = new JoinTarget<T3> (this, SignalArrivalTarget, compHelper,
() => outgoing.IsCompleted, dataflowBlockOptions,
dataflowBlockOptions.Greedy, TryAdd3);
outgoing = new OutgoingQueue<Tuple<T1, T2, T3>> (
@@ -124,11 +124,13 @@ namespace System.Threading.Tasks.Dataflow
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the first target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd1 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
@@ -136,6 +138,10 @@ namespace System.Threading.Tasks.Dataflow
<= dataflowBlockOptions.MaxNumberOfGroups;
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the second target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd2 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
@@ -143,13 +149,21 @@ namespace System.Threading.Tasks.Dataflow
<= dataflowBlockOptions.MaxNumberOfGroups;
}
+ /// <summary>
+ /// Returns whether a new item can be accepted by the third target,
+ /// and increments a counter if it can.
+ /// </summary>
bool TryAdd3 ()
{
return dataflowBlockOptions.MaxNumberOfGroups == -1
|| Interlocked.Increment (ref target3Count)
<= dataflowBlockOptions.MaxNumberOfGroups;
}
- void SignalArrivalTargetImpl ()
+
+ /// <summary>
+ /// Decides whether to create a new tuple or not.
+ /// </summary>
+ void SignalArrivalTarget ()
{
if (dataflowBlockOptions.Greedy) {
bool taken = false;
@@ -179,6 +193,9 @@ namespace System.Threading.Tasks.Dataflow
}
}
+ /// <summary>
+ /// Returns whether non-greedy creation of a tuple should be started.
+ /// </summary>
bool ShouldProcesNonGreedy ()
{
return target1.PostponedMessagesCount >= 1
@@ -188,6 +205,9 @@ namespace System.Threading.Tasks.Dataflow
|| outgoing.Count < dataflowBlockOptions.BoundedCapacity);
}
+ /// <summary>
+ /// Starts non-greedy creation of tuples, if one doesn't already run.
+ /// </summary>
void EnsureNonGreedyProcessing ()
{
if (nonGreedyProcessing.TrySet())
@@ -197,6 +217,10 @@ namespace System.Threading.Tasks.Dataflow
dataflowBlockOptions.TaskScheduler);
}
+ /// <summary>
+ /// Creates tuples in non-greedy mode,
+ /// making sure the whole tuple is available by using reservations.
+ /// </summary>
void NonGreedyProcess ()
{
while (ShouldProcesNonGreedy ()) {
@@ -231,6 +255,9 @@ namespace System.Threading.Tasks.Dataflow
EnsureNonGreedyProcessing ();
}
+ /// <summary>
+ /// Creates a tuple from the given values and adds the result to the output queue.
+ /// </summary>
void TriggerMessage (T1 val1, T2 val2, T3 val3)
{
outgoing.AddData (Tuple.Create (val1, val2, val3));
@@ -242,21 +269,15 @@ namespace System.Threading.Tasks.Dataflow
}
public ITargetBlock<T1> Target1 {
- get {
- return target1;
- }
+ get { return target1; }
}
public ITargetBlock<T2> Target2 {
- get {
- return target2;
- }
+ get { return target2; }
}
public ITargetBlock<T3> Target3 {
- get {
- return target3;
- }
+ get { return target3; }
}
public int OutputCount {
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs
index 8cceeddbd0a..48f293e1a47 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs
@@ -1,6 +1,7 @@
-// JoinBlock.cs
+// JoinBlock.cs
//
-// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@@ -23,8 +24,11 @@
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow {
- internal class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
- {
+ /// <summary>
+ /// Target block use by join blocks in their TargetN properties.
+ /// Also serves as its own <see cref="MessageBox{TInput}"/>.
+ /// </summary>
+ class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget> {
readonly IDataflowBlock joinBlock;
readonly Action signal;
@@ -40,21 +44,25 @@ namespace System.Threading.Tasks.Dataflow {
Target = this;
}
+ /// <summary>
+ /// Makes sure the input queue is processed the way it needs to,
+ /// by signaling the parent join block.
+ /// </summary>
protected override void EnsureProcessing (bool newItem)
{
signal ();
}
+ /// <summary>
+ /// The input queue of this block.
+ /// </summary>
public BlockingCollection<TTarget> Buffer {
- get {
- return MessageQueue;
- }
+ get { return MessageQueue; }
}
- DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
- TTarget messageValue,
- ISourceBlock<TTarget> source,
- bool consumeToAccept)
+ DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (
+ DataflowMessageHeader messageHeader, TTarget messageValue,
+ ISourceBlock<TTarget> source, bool consumeToAccept)
{
return OfferMessage (messageHeader, messageValue, source, consumeToAccept);
}
@@ -65,9 +73,7 @@ namespace System.Threading.Tasks.Dataflow {
}
Task IDataflowBlock.Completion {
- get {
- throw new NotSupportedException();
- }
+ get { throw new NotSupportedException (); }
}
void IDataflowBlock.Fault (Exception exception)
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
index 14ab8541fe8..239a610f727 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
@@ -146,11 +146,21 @@ namespace System.Threading.Tasks.Dataflow {
return DataflowMessageStatus.Accepted;
}
+ /// <summary>
+ /// Increses the count of items in the block by 1.
+ /// </summary>
public void IncreaseCount ()
{
Interlocked.Increment (ref itemCount);
}
+ /// <summary>
+ /// Decreses the number of items in the block by the given count.
+ /// </summary>
+ /// <remarks>
+ /// The <paramref name="count"/> parameter is used when one object
+ /// can represent many items, like a batch in <see cref="BatchBlock{T}"/>.
+ /// </remarks>
public void DecreaseCount (int count = 1)
{
int decreased = Interlocked.Add (ref itemCount, -count);
@@ -164,10 +174,22 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// The number of messages that were postponed
+ /// and can be attempted to be consumed.
+ /// </summary>
public int PostponedMessagesCount {
get { return postponedMessages.Count; }
}
+ /// <summary>
+ /// Reserves a message from those that were postponed.
+ /// Does not guarantee any order of the messages being reserved.
+ /// </summary>
+ /// <returns>
+ /// An object representing the reservation on success,
+ /// <c>null</c> on failure.
+ /// </returns>
public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
{
while (!postponedMessages.IsEmpty) {
@@ -193,11 +215,17 @@ namespace System.Threading.Tasks.Dataflow {
return null;
}
+ /// <summary>
+ /// Releases the given reservation.
+ /// </summary>
public void RelaseReservation(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
{
reservation.Item1.ReleaseReservation (reservation.Item2, Target);
}
+ /// <summary>
+ /// Consumes previously reserved item.
+ /// </summary>
public TInput ConsumeReserved(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
{
bool consumed;
@@ -205,6 +233,11 @@ namespace System.Threading.Tasks.Dataflow {
reservation.Item2, Target, out consumed);
}
+ /// <summary>
+ /// Makes sure retrieving items that were postponed,
+ /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>,
+ /// is currently running.
+ /// </summary>
void EnsurePostponedProcessing ()
{
if (postponedProcessing.TrySet())
@@ -212,6 +245,10 @@ namespace System.Threading.Tasks.Dataflow {
TaskCreationOptions.PreferFairness, options.TaskScheduler);
}
+ /// <summary>
+ /// Retrieves items that were postponed,
+ /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>.
+ /// </summary>
void RetrievePostponed ()
{
// BoundedCapacity can't be -1 here, because in that case there would be no postponing
@@ -261,6 +298,10 @@ namespace System.Threading.Tasks.Dataflow {
/// <param name="newItem">Was new item just added?</param>
protected abstract void EnsureProcessing (bool newItem);
+ /// <summary>
+ /// Completes the box, no new messages will be accepted.
+ /// Also starts the process of completing the output queue.
+ /// </summary>
public void Complete ()
{
// Make message queue complete
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs
index 64724944281..e82d1306b4a 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs
@@ -1,6 +1,7 @@
// ObservableDataflowBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@@ -19,22 +20,14 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- internal class ObservableDataflowBlock<TSource> : IObservable<TSource>
- {
- class ObserverWrapper : ITargetBlock<TSource>
- {
- IObserver<TSource> observer;
+namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Rx Observable that represents a source block.
+ /// </summary>
+ class ObservableDataflowBlock<TSource> : IObservable<TSource> {
+ class ObserverWrapper : ITargetBlock<TSource> {
+ readonly IObserver<TSource> observer;
public ObserverWrapper (IObserver<TSource> observer)
{
@@ -52,15 +45,12 @@ namespace System.Threading.Tasks.Dataflow
}
public Task Completion {
- get {
- return null;
- }
+ get { return null; }
}
- public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
- TSource messageValue,
- ISourceBlock<TSource> source,
- bool consumeToAccept)
+ public DataflowMessageStatus OfferMessage (
+ DataflowMessageHeader messageHeader, TSource messageValue,
+ ISourceBlock<TSource> source, bool consumeToAccept)
{
if (consumeToAccept) {
if (!source.ReserveMessage (messageHeader, this))
@@ -77,7 +67,7 @@ namespace System.Threading.Tasks.Dataflow
}
}
- ISourceBlock<TSource> source;
+ readonly ISourceBlock<TSource> source;
public ObservableDataflowBlock (ISourceBlock<TSource> source)
{
@@ -86,9 +76,8 @@ namespace System.Threading.Tasks.Dataflow
public IDisposable Subscribe (IObserver<TSource> observer)
{
- ObserverWrapper wrapper = new ObserverWrapper (observer);
+ var wrapper = new ObserverWrapper (observer);
return source.LinkTo (wrapper);
}
}
-}
-
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs
index f198dc51c38..55f3b6e8e4f 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs
@@ -1,6 +1,7 @@
// ObserverDataflowBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@@ -19,20 +20,13 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- internal class ObserverDataflowBlock<TInput> : IObserver<TInput>
- {
- ITargetBlock<TInput> target;
+namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Rx Observer that represents a target block.
+ /// </summary>
+ class ObserverDataflowBlock<TInput> : IObserver<TInput> {
+ readonly ITargetBlock<TInput> target;
public ObserverDataflowBlock (ITargetBlock<TInput> target)
{
@@ -54,5 +48,4 @@ namespace System.Threading.Tasks.Dataflow
target.Post (value);
}
}
-}
-
+} \ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs
index 2740df5245a..aa273ab2f16 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs
@@ -49,6 +49,9 @@ namespace System.Threading.Tasks.Dataflow {
this.countSelector = countSelector;
}
+ /// <summary>
+ /// Calculates the count of items in the given object.
+ /// </summary>
protected override int GetModifiedCount(T data)
{
if (countSelector == null)
@@ -57,6 +60,9 @@ namespace System.Threading.Tasks.Dataflow {
return countSelector (data);
}
+ /// <summary>
+ /// Sends messages to targets.
+ /// </summary>
protected override void Process ()
{
bool processed;
@@ -194,6 +200,9 @@ namespace System.Threading.Tasks.Dataflow {
EnsureProcessing ();
}
+ /// <summary>
+ /// Notifies that the first item in the queue changed.
+ /// </summary>
void FirstItemChanged ()
{
T firstItem;
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs
index 1c96728a0ba..b1c1c294870 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs
@@ -25,7 +25,7 @@ using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// This class handles outgoing message that get queued when there is no
+ /// Handles outgoing messages that get queued when there is no
/// block on the other end to proces it. It also allows receive operations.
/// </summary>
abstract class OutgoingQueueBase<T> {
@@ -54,19 +54,35 @@ namespace System.Threading.Tasks.Dataflow {
this.decreaseItemsCount = decreaseItemsCount;
}
+ /// <summary>
+ /// Is the queue completed?
+ /// Queue is completed after <see cref="Complete"/> is called
+ /// and all items are retrieved from it.
+ /// </summary>
public bool IsCompleted {
get { return Outgoing.IsCompleted; }
}
+ /// <summary>
+ /// Current number of items in the queue.
+ /// Item are counted the way <see cref="DataflowBlockOptions.BoundedCapacity"/>
+ /// counts them, e.g. each item in a batch counts, even if batch is a single object.
+ /// </summary>
public int Count {
get { return totalModifiedCount; }
}
+ /// <summary>
+ /// Calculates the count of items in the given object.
+ /// </summary>
protected virtual int GetModifiedCount (T data)
{
return 1;
}
+ /// <summary>
+ /// Adds an object to the queue.
+ /// </summary>
public void AddData (T data)
{
try {
@@ -79,6 +95,9 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Makes sure sending messages to targets is running.
+ /// </summary>
protected void EnsureProcessing ()
{
ForceProcessing = true;
@@ -87,13 +106,25 @@ namespace System.Threading.Tasks.Dataflow {
TaskCreationOptions.PreferFairness, options.TaskScheduler);
}
+ /// <summary>
+ /// Indicates whether sending messages should be forced to start.
+ /// </summary>
protected bool ForceProcessing {
get { return forceProcessing; }
set { forceProcessing = value; }
}
+ /// <summary>
+ /// Sends messages to targets.
+ /// </summary>
protected abstract void Process ();
+ /// <summary>
+ /// Adds a target block to send messages to.
+ /// </summary>
+ /// <returns>
+ /// An object that can be used to destroy the link to the added target.
+ /// </returns>
public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions linkOptions)
{
if (targetBlock == null)
@@ -106,16 +137,26 @@ namespace System.Threading.Tasks.Dataflow {
return result;
}
+ /// <summary>
+ /// Makes sure the block is completed if it should be.
+ /// </summary>
protected void VerifyCompleteness ()
{
if (Outgoing.IsCompleted && externalCompleteTester ())
compHelper.Complete ();
}
+ /// <summary>
+ /// Is the block faulted or cancelled?
+ /// </summary>
protected bool IsFaultedOrCancelled {
get { return compHelper.Completion.IsFaulted || compHelper.Completion.IsCanceled; }
}
+ /// <summary>
+ /// Used to notify that object was removed from the queue
+ /// and to update counts.
+ /// </summary>
protected void DecreaseCounts (T data)
{
var modifiedCount = GetModifiedCount (data);
@@ -124,6 +165,9 @@ namespace System.Threading.Tasks.Dataflow {
decreaseItemsCount (modifiedCount);
}
+ /// <summary>
+ /// Marks the queue for completion.
+ /// </summary>
public void Complete ()
{
Outgoing.CompleteAdding ();
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs
index e8b184dea3f..b53be90dcc1 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs
@@ -23,8 +23,8 @@
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// This internal block is used by the OutputAvailable methods in DataflowBlock static class
- /// to check for available items in an asynchrnousy way
+ /// This internal block is used by the <see cref="DataflowBlock.OutputAvailableAsync"/> methods
+ /// to check for available items in an asynchronous way.
/// </summary>
class OutputAvailableBlock<TOutput> : ITargetBlock<TOutput> {
readonly TaskCompletionSource<bool> completion =
@@ -48,6 +48,11 @@ namespace System.Threading.Tasks.Dataflow {
return DataflowMessageStatus.DecliningPermanently;
}
+ /// <summary>
+ /// Returns a Task that can be used to wait until output from a block is available.
+ /// </summary>
+ /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param>
+ /// <param name="token">Cancellation token for this operation.</param>
public Task<bool> AsyncGet (IDisposable bridge, CancellationToken token)
{
linkBridge = bridge;
@@ -60,6 +65,10 @@ namespace System.Threading.Tasks.Dataflow {
return completion.Task;
}
+ /// <summary>
+ /// Called after the result has been set,
+ /// cleans up after this block.
+ /// </summary>
void CompletionSet ()
{
if (linkBridge != null) {
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs
index 7c148718a35..962a0baf387 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs
@@ -23,10 +23,11 @@
using System.Collections.Concurrent;
-namespace System.Threading.Tasks.Dataflow
-{
- internal class PassingMessageBox<TInput> : MessageBox<TInput>
- {
+namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Message box for blocks that don't need any special processing of incoming items.
+ /// </summary>
+ class PassingMessageBox<TInput> : MessageBox<TInput> {
readonly Action<bool> processQueue;
public PassingMessageBox (
@@ -40,6 +41,11 @@ namespace System.Threading.Tasks.Dataflow
this.processQueue = processQueue;
}
+ /// <summary>
+ /// Makes sure the input queue is processed the way it needs to.
+ /// Executes synchronously, so shouldn't cause any long processing.
+ /// </summary>
+ /// <param name="newItem">Was new item just added?</param>
protected override void EnsureProcessing (bool newItem)
{
processQueue (newItem);
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
index 9e4bd1dced1..42b45d16b6c 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
@@ -22,6 +22,9 @@
// THE SOFTWARE.
namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Block returned by <see cref="DataflowBlock.Encapsulate{TInput,TOutput}"/>.
+ /// </summary>
class PropagatorWrapperBlock<TInput, TOutput> :
IPropagatorBlock<TInput, TOutput> {
readonly ITargetBlock<TInput> targetBlock;
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs
index c3820992a5d..e9ea6fe295c 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs
@@ -23,8 +23,8 @@
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// This internal block is used by the Receive methods in DataflowBlock static class
- /// to retrieve elements in either blocking or asynchronous way
+ /// This internal block is used by the <see cref="DataflowBlock.Receive"/> methods
+ /// to retrieve elements in either blocking or asynchronous way.
/// </summary>
class ReceiveBlock<TOutput> : ITargetBlock<TOutput> {
readonly TaskCompletionSource<TOutput> completion =
@@ -63,6 +63,12 @@ namespace System.Threading.Tasks.Dataflow {
return DataflowMessageStatus.Accepted;
}
+ /// <summary>
+ /// Synchronously waits until an item is available.
+ /// </summary>
+ /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param>
+ /// <param name="token">Cancellation token for this operation.</param>
+ /// <param name="timeout">Timeout of this operation, in milliseconds.</param>
public TOutput WaitAndGet (IDisposable bridge, CancellationToken token, int timeout)
{
try {
@@ -75,6 +81,12 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Asynchronously waits until an item is available.
+ /// </summary>
+ /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param>
+ /// <param name="token">Cancellation token for this operation.</param>
+ /// <param name="timeout">Timeout of this operation, in milliseconds.</param>
public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, int timeout)
{
linkBridge = bridge;
@@ -98,6 +110,10 @@ namespace System.Threading.Tasks.Dataflow {
return completion.Task;
}
+ /// <summary>
+ /// Called after the result has been set,
+ /// cleans up after this block.
+ /// </summary>
void CompletionSet ()
{
if (linkBridge != null) {
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs
index 78f6c9cf88f..ec9b08745c4 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs
@@ -46,6 +46,10 @@ namespace System.Threading.Tasks.Dataflow {
this.cancellationToken = cancellationToken;
}
+ /// <summary>
+ /// Sends the item given in the constructor to the target block.
+ /// </summary>
+ /// <returns>Task that completes when the sending is done, or can't be performed.</returns>
public Task<bool> Send ()
{
cancellationTokenRegistration = cancellationToken.Register (
@@ -60,6 +64,9 @@ namespace System.Threading.Tasks.Dataflow {
return taskCompletionSource.Task;
}
+ /// <summary>
+ /// Offers the item to the target and hadles its response.
+ /// </summary>
void PerformSend ()
{
DisableCancel ();
@@ -148,11 +155,19 @@ namespace System.Threading.Tasks.Dataflow {
return false;
}
+ /// <summary>
+ /// Temporarily disables cancelling.
+ /// </summary>
void DisableCancel ()
{
cancelDisabled = true;
}
+ /// <summary>
+ /// Enables cancelling after it was disabled.
+ /// If cancellation was attempted in the meantime,
+ /// actually performs the cancelling.
+ /// </summary>
void EnableCancel ()
{
cancelDisabled = false;
@@ -161,6 +176,9 @@ namespace System.Threading.Tasks.Dataflow {
taskCompletionSource.SetCanceled ();
}
+ /// <summary>
+ /// Sets the result of the operation.
+ /// </summary>
void SetResult (bool result)
{
cancellationTokenRegistration.Dispose ();
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs
index 0c01bc6c57b..8b49b461d1c 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs
@@ -25,7 +25,7 @@ using System.Collections.Generic;
namespace System.Threading.Tasks.Dataflow {
/// <summary>
- /// Collection of target blocks for a source block.
+ /// Base class for collection of target blocks for a source block.
/// Also handles sending messages to the target blocks.
/// </summary>
abstract class TargetCollectionBase<T> {
@@ -52,6 +52,10 @@ namespace System.Threading.Tasks.Dataflow {
Reserved = new AtomicBoolean ();
}
+ /// <summary>
+ /// Is called after a message was sent, makes sure the linked is destroyed after
+ /// <see cref="DataflowLinkOptions.MaxMessages"/> were sent.
+ /// </summary>
public void MessageSent()
{
if (remainingMessages != -1)
@@ -61,11 +65,17 @@ namespace System.Threading.Tasks.Dataflow {
}
readonly AtomicBoolean disabled = new AtomicBoolean ();
+ /// <summary>
+ /// Is the link destroyed?
+ /// </summary>
public bool Disabled
{
get { return disabled.Value; }
}
+ /// <summary>
+ /// Destroys the link to this target.
+ /// </summary>
public void Dispose ()
{
disabled.Value = true;
@@ -74,16 +84,22 @@ namespace System.Threading.Tasks.Dataflow {
cancellationTokenSource.Cancel ();
Target ignored;
- targetCollection.targetDictionary.TryRemove (TargetBlock, out ignored);
+ targetCollection.TargetDictionary.TryRemove (TargetBlock, out ignored);
// to avoid memory leak; it could take a long time
// before this object is actually removed from the collection
TargetBlock = null;
}
+ /// <summary>
+ /// Does this target have a postponed message?
+ /// </summary>
public AtomicBoolean Postponed { get; private set; }
- // used only by broadcast blocks
+ /// <summary>
+ /// Does this target have a reserved message?
+ /// </summary>
+ /// <remarks>Used only by broadcast blocks.</remarks>
public AtomicBoolean Reserved { get; private set; }
}
@@ -95,7 +111,7 @@ namespace System.Threading.Tasks.Dataflow {
readonly ConcurrentQueue<Target> appendQueue = new ConcurrentQueue<Target> ();
readonly LinkedList<Target> targets = new LinkedList<Target> ();
- protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> targetDictionary =
+ protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> TargetDictionary =
new ConcurrentDictionary<ITargetBlock<T>, Target> ();
// lastMessageHeaderId will be always accessed only from one thread
@@ -106,13 +122,19 @@ namespace System.Threading.Tasks.Dataflow {
bool firstOffering;
T currentItem;
- public TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
+ protected TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
{
this.block = block;
this.broadcast = broadcast;
this.consumeToAccept = consumeToAccept;
}
+ /// <summary>
+ /// Adds a target block to send messages to.
+ /// </summary>
+ /// <returns>
+ /// An object that can be used to destroy the link to the added target.
+ /// </returns>
public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions options)
{
CancellationTokenSource cancellationTokenSource = null;
@@ -129,7 +151,7 @@ namespace System.Threading.Tasks.Dataflow {
var target = new Target (
this, targetBlock, options.MaxMessages, cancellationTokenSource);
- targetDictionary [targetBlock] = target;
+ TargetDictionary [targetBlock] = target;
if (options.Append)
appendQueue.Enqueue (target);
else
@@ -138,6 +160,9 @@ namespace System.Threading.Tasks.Dataflow {
return target;
}
+ /// <summary>
+ /// Sets the current item to be offered to targets
+ /// </summary>
public void SetCurrentItem (T item)
{
firstOffering = true;
@@ -147,18 +172,32 @@ namespace System.Threading.Tasks.Dataflow {
ClearUnpostponed ();
}
+ /// <summary>
+ /// Clears the collection of "unpostponed" targets.
+ /// </summary>
protected abstract void ClearUnpostponed ();
+ /// <summary>
+ /// Resets the current item to be offered to targets.
+ /// This means there is currently nothing to offer.
+ /// </summary>
public void ResetCurrentItem ()
{
currentItem = default(T);
Thread.VolatileWrite (ref currentMessageHeaderId, 0);
}
+ /// <summary>
+ /// Is there an item to send right now?
+ /// </summary>
public bool HasCurrentItem {
get { return Thread.VolatileRead (ref currentMessageHeaderId) != 0; }
}
+ /// <summary>
+ /// Offers the current item to all eligible targets.
+ /// </summary>
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
public bool OfferItemToTargets ()
{
// is there an item to offer?
@@ -169,7 +208,7 @@ namespace System.Threading.Tasks.Dataflow {
do {
// order is important here, we want to make sure that prepended target
- // added after appended target is always processed first
+ // added before appended target is processed first
var appended = PrependOrAppend (false);
var prepended = PrependOrAppend (true);
@@ -192,6 +231,9 @@ namespace System.Threading.Tasks.Dataflow {
return false;
}
+ /// <summary>
+ /// Are there any targets that currently require a message to be sent to them?
+ /// </summary>
public bool NeedsProcessing {
get {
return !appendQueue.IsEmpty || !prependQueue.IsEmpty
@@ -199,8 +241,19 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Is the collection of unpostponed targets empty?
+ /// </summary>
protected abstract bool UnpostponedIsEmpty { get; }
+ /// <summary>
+ /// Prepends (appends) targets that should be prepended (appended) to the collection of targets.
+ /// </summary>
+ /// <param name="prepend"><c>true</c> to prepend, <c>false</c> to append.</param>
+ /// <returns>
+ /// Nodes that contain first and last target added to the list,
+ /// or <c>null</c> if no nodes were added.
+ /// </returns>
Tuple<LinkedListNode<Target>, LinkedListNode<Target>> PrependOrAppend (
bool prepend)
{
@@ -227,6 +280,10 @@ namespace System.Threading.Tasks.Dataflow {
: Tuple.Create (first, last);
}
+ /// <summary>
+ /// Offers the current item to the targets between the given nodes (inclusive).
+ /// </summary>
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
bool OfferItemToTargets (
Tuple<LinkedListNode<Target>, LinkedListNode<Target>> targetPair)
{
@@ -252,8 +309,16 @@ namespace System.Threading.Tasks.Dataflow {
return false;
}
+ /// <summary>
+ /// Offers the current item to unpostponed targets.
+ /// </summary>
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
protected abstract bool OfferItemToUnpostponed ();
+ /// <summary>
+ /// Offers the current item to the given target.
+ /// </summary>
+ /// <returns>Was the item accepted?</returns>
protected bool OfferItem (Target target)
{
if (target.Reserved.Value)
@@ -282,12 +347,18 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Returns whether the given header corresponds to the current item.
+ /// </summary>
public bool VerifyHeader (DataflowMessageHeader header)
{
return header.Id == Thread.VolatileRead (ref currentMessageHeaderId);
}
}
+ /// <summary>
+ /// Target collection for non-broadcast blocks.
+ /// </summary>
class TargetCollection<T> : TargetCollectionBase<T> {
readonly ConcurrentQueue<Target> unpostponedTargets =
new ConcurrentQueue<Target> ();
@@ -297,20 +368,32 @@ namespace System.Threading.Tasks.Dataflow {
{
}
+ /// <summary>
+ /// Is the collection of unpostponed targets empty?
+ /// </summary>
protected override bool UnpostponedIsEmpty {
get { return unpostponedTargets.IsEmpty; }
}
+ /// <summary>
+ /// Returns whether the given header corresponds to the current item
+ /// and that the given target block postponed this item.
+ /// </summary>
public bool VerifyHeader (DataflowMessageHeader header, ITargetBlock<T> targetBlock)
{
return VerifyHeader (header)
- && targetDictionary[targetBlock].Postponed.Value;
+ && TargetDictionary[targetBlock].Postponed.Value;
}
+ /// <summary>
+ /// Unpostpones the given target.
+ /// </summary>
+ /// <param name="targetBlock">Target to unpostpone.</param>
+ /// <param name="messageConsumed">Did the target consume an item?</param>
public void UnpostponeTarget (ITargetBlock<T> targetBlock, bool messageConsumed)
{
Target target;
- if (!targetDictionary.TryGetValue (targetBlock, out target))
+ if (!TargetDictionary.TryGetValue (targetBlock, out target))
return;
if (messageConsumed)
@@ -320,6 +403,9 @@ namespace System.Threading.Tasks.Dataflow {
target.Postponed.Value = false;
}
+ /// <summary>
+ /// Clears the collection of "unpostponed" targets.
+ /// </summary>
protected override void ClearUnpostponed ()
{
Target ignored;
@@ -327,6 +413,10 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Offers the current item to unpostponed targets.
+ /// </summary>
+ /// <returns>Was the item accepted?</returns>
protected override bool OfferItemToUnpostponed ()
{
Target target;
@@ -339,6 +429,9 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Target collection for broadcast blocks.
+ /// </summary>
class BroadcastTargetCollection<T> : TargetCollectionBase<T> {
// it's necessary to store the headers because of a race between
// UnpostponeTargetConsumed and SetCurrentItem
@@ -351,19 +444,30 @@ namespace System.Threading.Tasks.Dataflow {
{
}
+ /// <summary>
+ /// Is the collection of unpostponed targets empty?
+ /// </summary>
protected override bool UnpostponedIsEmpty {
get { return unpostponedTargets.IsEmpty; }
}
+ /// <summary>
+ /// Marks the target as having a reserved message.
+ /// </summary>
public void ReserveTarget (ITargetBlock<T> targetBlock)
{
- targetDictionary [targetBlock].Reserved.Value = true;
+ TargetDictionary [targetBlock].Reserved.Value = true;
}
+ /// <summary>
+ /// Unpostpone target after it consumed a message.
+ /// </summary>
+ /// <param name="targetBlock">The target to unpostpone.</param>
+ /// <param name="header">Header of the message the target consumed.</param>
public void UnpostponeTargetConsumed (ITargetBlock<T> targetBlock,
DataflowMessageHeader header)
{
- Target target = targetDictionary [targetBlock];
+ Target target = TargetDictionary [targetBlock];
target.MessageSent ();
unpostponedTargets.Enqueue (Tuple.Create (target, header));
@@ -372,10 +476,13 @@ namespace System.Threading.Tasks.Dataflow {
target.Reserved.Value = false;
}
+ /// <summary>
+ /// Unpostpone target in the case when it didn't successfuly consume a message.
+ /// </summary>
public void UnpostponeTargetNotConsumed (ITargetBlock<T> targetBlock)
{
Target target;
- if (!targetDictionary.TryGetValue (targetBlock, out target))
+ if (!TargetDictionary.TryGetValue (targetBlock, out target))
return;
unpostponedTargets.Enqueue (Tuple.Create (target,
@@ -385,6 +492,9 @@ namespace System.Threading.Tasks.Dataflow {
target.Reserved.Value = false;
}
+ /// <summary>
+ /// Clears the collection of "unpostponed" targets.
+ /// </summary>
protected override void ClearUnpostponed ()
{
Tuple<Target, DataflowMessageHeader> ignored;
@@ -392,6 +502,10 @@ namespace System.Threading.Tasks.Dataflow {
}
}
+ /// <summary>
+ /// Offers the current item to unpostponed targets.
+ /// </summary>
+ /// <returns>Always <c>false</c>.</returns>
protected override bool OfferItemToUnpostponed ()
{
Tuple<Target, DataflowMessageHeader> tuple;
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
index 7349a084042..a1c37f4dce3 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
@@ -129,6 +129,10 @@ namespace System.Threading.Tasks.Dataflow {
return outgoing.TryReceiveAll (out items);
}
+ /// <summary>
+ /// Transforms one item from the queue if the transform delegate is synchronous.
+ /// </summary>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns>
bool TransformProcess ()
{
TInput input;
@@ -140,6 +144,11 @@ namespace System.Threading.Tasks.Dataflow {
return dequeued;
}
+ /// <summary>
+ /// Processes one item from the queue if the transform delegate is asynchronous.
+ /// </summary>
+ /// <param name="task">The Task that was returned by the synchronous part of the delegate.</param>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
bool AsyncTransformProcess (out Task<TOutput> task)
{
TInput input;
@@ -153,6 +162,9 @@ namespace System.Threading.Tasks.Dataflow {
return dequeued;
}
+ /// <summary>
+ /// Process result of finished asynchronous transformation.
+ /// </summary>
void AsyncProcessFinishedTask (Task<TOutput> task)
{
if (task == null || task.IsCanceled)
@@ -172,21 +184,15 @@ namespace System.Threading.Tasks.Dataflow {
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
public int OutputCount {
- get {
- return outgoing.Count;
- }
+ get { return outgoing.Count; }
}
public int InputCount {
- get {
- return messageQueue.Count;
- }
+ get { return messageQueue.Count; }
}
public override string ToString ()
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
index 194d228f569..7bb465728a5 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
@@ -127,6 +127,10 @@ namespace System.Threading.Tasks.Dataflow {
return outgoing.TryReceiveAll (out items);
}
+ /// <summary>
+ /// Transforms one item from the queue if the transform delegate is synchronous.
+ /// </summary>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns>
bool TransformProcess ()
{
TInput input;
@@ -141,6 +145,9 @@ namespace System.Threading.Tasks.Dataflow {
return dequeued;
}
+ /// <summary>
+ /// Adds the transformed collection to the output queue.
+ /// </summary>
void EnqueueTransformed (IEnumerable<TOutput> transformed)
{
bool first = true;
@@ -157,6 +164,11 @@ namespace System.Threading.Tasks.Dataflow {
messageBox.DecreaseCount ();
}
+ /// <summary>
+ /// Processes one item from the queue if the transform delegate is asynchronous.
+ /// </summary>
+ /// <param name="task">The Task that was returned by the synchronous part of the delegate.</param>
+ /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns>
bool AsyncTransformProcess (out Task<IEnumerable<TOutput>> task)
{
TInput input;
@@ -170,6 +182,9 @@ namespace System.Threading.Tasks.Dataflow {
return dequeued;
}
+ /// <summary>
+ /// Process result of finished asynchronous transformation.
+ /// </summary>
void ProcessFinishedTask (Task<IEnumerable<TOutput>> task)
{
if (task == null || task.IsCanceled)
@@ -189,21 +204,15 @@ namespace System.Threading.Tasks.Dataflow {
}
public Task Completion {
- get {
- return compHelper.Completion;
- }
+ get { return compHelper.Completion; }
}
public int OutputCount {
- get {
- return outgoing.Count;
- }
+ get { return outgoing.Count; }
}
public int InputCount {
- get {
- return messageQueue.Count;
- }
+ get { return messageQueue.Count; }
}
public override string ToString ()
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
index 0711bed8045..6da70460598 100644
--- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
+++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
@@ -116,6 +116,9 @@ namespace System.Threading.Tasks.Dataflow {
return true;
}
+ /// <summary>
+ /// Moves an item from the input queue to the output queue.
+ /// </summary>
void BroadcastProcess ()
{
T item;