diff options
author | Petr Onderka <gsvick@gmail.com> | 2012-08-19 22:34:18 +0400 |
---|---|---|
committer | Petr Onderka <gsvick@gmail.com> | 2012-08-20 02:09:05 +0400 |
commit | 2c25c41e032eda21b18b09455b8df1077c4d1075 (patch) | |
tree | f41d1ffe783b6b8798954b41f2e2642cbfa8be8d /mcs/class/System.Threading.Tasks.Dataflow | |
parent | 06bcfa044fd178e00f397cafac82b6c673b3d667 (diff) |
Finished adding documentation for non-public types and methods
Diffstat (limited to 'mcs/class/System.Threading.Tasks.Dataflow')
25 files changed, 479 insertions, 151 deletions
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs index 20a3a23cf59..312e5f28038 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs @@ -241,7 +241,7 @@ namespace System.Threading.Tasks.Dataflow { } /// <summary> - /// Creates a batch of the given size and adds the resulting batch to the output queue. + /// Creates a batch of the given size and adds the result to the output queue. /// </summary> void MakeBatch (int size) { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs index e877c36134a..b2e2884502b 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs @@ -84,7 +84,7 @@ namespace System.Threading.Tasks.Dataflow { /// <summary> /// Takes an item from the queue and sets it as <see cref="CurrentItem"/>. /// </summary> - public void DequeueItem() + public void DequeueItem () { T item; if (Outgoing.TryTake (out item)) { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs index 36f2eaedce9..bf2d0b03dd6 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs @@ -22,7 +22,13 @@ // THE SOFTWARE. namespace System.Threading.Tasks.Dataflow { + /// <summary> + /// Block used in all versions of <see cref="DataflowBlock.Choose"/>. + /// </summary> class ChooserBlock<T1, T2, T3> { + /// <summary> + /// Target for one of the sources to choose from. + /// </summary> class ChooseTarget<TMessage> : ITargetBlock<TMessage> { readonly ChooserBlock<T1, T2, T3> chooserBlock; readonly int index; @@ -97,6 +103,11 @@ namespace System.Threading.Tasks.Dataflow { dataflowBlockOptions.CancellationToken.Register (Cancelled); } + /// <summary> + /// Causes cancellation of <see cref="Completion"/>. + /// If a message is already being consumed (and the consumsing succeeds) + /// or if its action is being invoked, the Task is not cancelled. + /// </summary> void Cancelled () { if (!canAccept) @@ -117,11 +128,19 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Called when all sources have completed, + /// causes cancellation of <see cref="Completion"/>. + /// </summary> public void AllSourcesCompleted () { Cancelled (); } + /// <summary> + /// Called when message has arrived (and was consumed, if necessary). + /// This method can be called only once in the lifetime of this object. + /// </summary> void MessageArrived<TMessage> ( int index, Action<TMessage> action, TMessage value) { @@ -133,12 +152,26 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Target block for the first source block. + /// </summary> public ITargetBlock<T1> Target1 { get; private set; } + /// <summary> + /// Target block for the second source block. + /// </summary> public ITargetBlock<T2> Target2 { get; private set; } + /// <summary> + /// Target block for the third source block. + /// Is <c>null</c> if there are only two actions. + /// </summary> public ITargetBlock<T3> Target3 { get; private set; } + /// <summary> + /// Task that signifies that an item was accepted and + /// its action has been called. + /// </summary> public Task<int> Completion { get { return completion.Task; } } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs index c24f718345a..608e22f1425 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs @@ -26,6 +26,9 @@ namespace System.Threading.Tasks.Dataflow { static readonly DataflowBlockOptions DefaultOptions = new DataflowBlockOptions (); + /// <summary> + /// Cached default block options + /// </summary> internal static DataflowBlockOptions Default { get { return DefaultOptions; } } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs index 7506b3fd22e..2b5658208c7 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutionDataflowBlockOptions.cs @@ -28,6 +28,9 @@ namespace System.Threading.Tasks.Dataflow { int maxDegreeOfParallelism; + /// <summary> + /// Cached default block options + /// </summary> internal static new ExecutionDataflowBlockOptions Default { get { return DefaultOptions; } } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs index 99cfa23638f..f7a24d4b6fe 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs @@ -29,6 +29,9 @@ namespace System.Threading.Tasks.Dataflow long maxNumberOfGroups; + /// <summary> + /// Cached default block options + /// </summary> internal static new GroupingDataflowBlockOptions Default { get { return DefaultOptions; } } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs index 3ae427ca8c0..a6e69b5630e 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IDataflowBlock.cs @@ -19,23 +19,12 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// -// - -using System; -using System.Threading.Tasks; - -namespace System.Threading.Tasks.Dataflow -{ - public interface IDataflowBlock - { - Task Completion { - get; - } +namespace System.Threading.Tasks.Dataflow { + public interface IDataflowBlock { + Task Completion { get; } void Complete (); void Fault (Exception exception); } -} - +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs index 2bb39a21b51..dc35f58cfbb 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs @@ -19,20 +19,12 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// -// - -using System; -using System.Threading.Tasks; using System.Collections.Generic; -namespace System.Threading.Tasks.Dataflow -{ - public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput>, IDataflowBlock - { +namespace System.Threading.Tasks.Dataflow { + public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput> { bool TryReceive (Predicate<TOutput> filter, out TOutput item); bool TryReceiveAll (out IList<TOutput> items); - } -} - + } +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs index 1117926dcd9..fb424fc0e8b 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs @@ -52,10 +52,10 @@ namespace System.Threading.Tasks.Dataflow this.dataflowBlockOptions = dataflowBlockOptions; compHelper = CompletionHelper.GetNew (dataflowBlockOptions); - target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, + target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper, () => outgoing.IsCompleted, dataflowBlockOptions, dataflowBlockOptions.Greedy, TryAdd1); - target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, + target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper, () => outgoing.IsCompleted, dataflowBlockOptions, dataflowBlockOptions.Greedy, TryAdd2); outgoing = new OutgoingQueue<Tuple<T1, T2>> (this, compHelper, @@ -114,11 +114,13 @@ namespace System.Threading.Tasks.Dataflow } public Task Completion { - get { - return compHelper.Completion; - } + get { return compHelper.Completion; } } + /// <summary> + /// Returns whether a new item can be accepted by the first target, + /// and increments a counter if it can. + /// </summary> bool TryAdd1 () { return dataflowBlockOptions.MaxNumberOfGroups == -1 @@ -126,6 +128,10 @@ namespace System.Threading.Tasks.Dataflow <= dataflowBlockOptions.MaxNumberOfGroups; } + /// <summary> + /// Returns whether a new item can be accepted by the second target, + /// and increments a counter if it can. + /// </summary> bool TryAdd2 () { return dataflowBlockOptions.MaxNumberOfGroups == -1 @@ -133,7 +139,10 @@ namespace System.Threading.Tasks.Dataflow <= dataflowBlockOptions.MaxNumberOfGroups; } - void SignalArrivalTargetImpl() + /// <summary> + /// Decides whether to create a new tuple or not. + /// </summary> + void SignalArrivalTarget () { if (dataflowBlockOptions.Greedy) { bool taken = false; @@ -160,6 +169,9 @@ namespace System.Threading.Tasks.Dataflow } } + /// <summary> + /// Returns whether non-greedy creation of a tuple should be started. + /// </summary> bool ShouldProcessNonGreedy () { return target1.PostponedMessagesCount >= 1 @@ -168,6 +180,9 @@ namespace System.Threading.Tasks.Dataflow || outgoing.Count < dataflowBlockOptions.BoundedCapacity); } + /// <summary> + /// Starts non-greedy creation of tuples, if one doesn't already run. + /// </summary> void EnsureNonGreedyProcessing () { if (nonGreedyProcessing.TrySet ()) @@ -177,6 +192,10 @@ namespace System.Threading.Tasks.Dataflow dataflowBlockOptions.TaskScheduler); } + /// <summary> + /// Creates tuples in non-greedy mode, + /// making sure the whole tuple is available by using reservations. + /// </summary> void NonGreedyProcess() { while (ShouldProcessNonGreedy ()) { @@ -204,6 +223,9 @@ namespace System.Threading.Tasks.Dataflow } + /// <summary> + /// Creates a tuple from the given values and adds the result to the output queue. + /// </summary> void TriggerMessage (T1 val1, T2 val2) { outgoing.AddData (Tuple.Create (val1, val2)); @@ -215,15 +237,11 @@ namespace System.Threading.Tasks.Dataflow } public ITargetBlock<T1> Target1 { - get { - return target1; - } + get { return target1; } } public ITargetBlock<T2> Target2 { - get { - return target2; - } + get { return target2; } } public int OutputCount { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs index 6b0800ce394..419beb75c60 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs @@ -55,13 +55,13 @@ namespace System.Threading.Tasks.Dataflow this.dataflowBlockOptions = dataflowBlockOptions; this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions); - target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, + target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper, () => outgoing.IsCompleted, dataflowBlockOptions, dataflowBlockOptions.Greedy, TryAdd1); - target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, + target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper, () => outgoing.IsCompleted, dataflowBlockOptions, dataflowBlockOptions.Greedy, TryAdd2); - target3 = new JoinTarget<T3> (this, SignalArrivalTargetImpl, compHelper, + target3 = new JoinTarget<T3> (this, SignalArrivalTarget, compHelper, () => outgoing.IsCompleted, dataflowBlockOptions, dataflowBlockOptions.Greedy, TryAdd3); outgoing = new OutgoingQueue<Tuple<T1, T2, T3>> ( @@ -124,11 +124,13 @@ namespace System.Threading.Tasks.Dataflow } public Task Completion { - get { - return compHelper.Completion; - } + get { return compHelper.Completion; } } + /// <summary> + /// Returns whether a new item can be accepted by the first target, + /// and increments a counter if it can. + /// </summary> bool TryAdd1 () { return dataflowBlockOptions.MaxNumberOfGroups == -1 @@ -136,6 +138,10 @@ namespace System.Threading.Tasks.Dataflow <= dataflowBlockOptions.MaxNumberOfGroups; } + /// <summary> + /// Returns whether a new item can be accepted by the second target, + /// and increments a counter if it can. + /// </summary> bool TryAdd2 () { return dataflowBlockOptions.MaxNumberOfGroups == -1 @@ -143,13 +149,21 @@ namespace System.Threading.Tasks.Dataflow <= dataflowBlockOptions.MaxNumberOfGroups; } + /// <summary> + /// Returns whether a new item can be accepted by the third target, + /// and increments a counter if it can. + /// </summary> bool TryAdd3 () { return dataflowBlockOptions.MaxNumberOfGroups == -1 || Interlocked.Increment (ref target3Count) <= dataflowBlockOptions.MaxNumberOfGroups; } - void SignalArrivalTargetImpl () + + /// <summary> + /// Decides whether to create a new tuple or not. + /// </summary> + void SignalArrivalTarget () { if (dataflowBlockOptions.Greedy) { bool taken = false; @@ -179,6 +193,9 @@ namespace System.Threading.Tasks.Dataflow } } + /// <summary> + /// Returns whether non-greedy creation of a tuple should be started. + /// </summary> bool ShouldProcesNonGreedy () { return target1.PostponedMessagesCount >= 1 @@ -188,6 +205,9 @@ namespace System.Threading.Tasks.Dataflow || outgoing.Count < dataflowBlockOptions.BoundedCapacity); } + /// <summary> + /// Starts non-greedy creation of tuples, if one doesn't already run. + /// </summary> void EnsureNonGreedyProcessing () { if (nonGreedyProcessing.TrySet()) @@ -197,6 +217,10 @@ namespace System.Threading.Tasks.Dataflow dataflowBlockOptions.TaskScheduler); } + /// <summary> + /// Creates tuples in non-greedy mode, + /// making sure the whole tuple is available by using reservations. + /// </summary> void NonGreedyProcess () { while (ShouldProcesNonGreedy ()) { @@ -231,6 +255,9 @@ namespace System.Threading.Tasks.Dataflow EnsureNonGreedyProcessing (); } + /// <summary> + /// Creates a tuple from the given values and adds the result to the output queue. + /// </summary> void TriggerMessage (T1 val1, T2 val2, T3 val3) { outgoing.AddData (Tuple.Create (val1, val2, val3)); @@ -242,21 +269,15 @@ namespace System.Threading.Tasks.Dataflow } public ITargetBlock<T1> Target1 { - get { - return target1; - } + get { return target1; } } public ITargetBlock<T2> Target2 { - get { - return target2; - } + get { return target2; } } public ITargetBlock<T3> Target3 { - get { - return target3; - } + get { return target3; } } public int OutputCount { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs index 8cceeddbd0a..48f293e1a47 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs @@ -1,6 +1,7 @@ -// JoinBlock.cs +// JoinBlock.cs // -// Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -23,8 +24,11 @@ using System.Collections.Concurrent; namespace System.Threading.Tasks.Dataflow { - internal class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget> - { + /// <summary> + /// Target block use by join blocks in their TargetN properties. + /// Also serves as its own <see cref="MessageBox{TInput}"/>. + /// </summary> + class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget> { readonly IDataflowBlock joinBlock; readonly Action signal; @@ -40,21 +44,25 @@ namespace System.Threading.Tasks.Dataflow { Target = this; } + /// <summary> + /// Makes sure the input queue is processed the way it needs to, + /// by signaling the parent join block. + /// </summary> protected override void EnsureProcessing (bool newItem) { signal (); } + /// <summary> + /// The input queue of this block. + /// </summary> public BlockingCollection<TTarget> Buffer { - get { - return MessageQueue; - } + get { return MessageQueue; } } - DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader, - TTarget messageValue, - ISourceBlock<TTarget> source, - bool consumeToAccept) + DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage ( + DataflowMessageHeader messageHeader, TTarget messageValue, + ISourceBlock<TTarget> source, bool consumeToAccept) { return OfferMessage (messageHeader, messageValue, source, consumeToAccept); } @@ -65,9 +73,7 @@ namespace System.Threading.Tasks.Dataflow { } Task IDataflowBlock.Completion { - get { - throw new NotSupportedException(); - } + get { throw new NotSupportedException (); } } void IDataflowBlock.Fault (Exception exception) diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs index 14ab8541fe8..239a610f727 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs @@ -146,11 +146,21 @@ namespace System.Threading.Tasks.Dataflow { return DataflowMessageStatus.Accepted; } + /// <summary> + /// Increses the count of items in the block by 1. + /// </summary> public void IncreaseCount () { Interlocked.Increment (ref itemCount); } + /// <summary> + /// Decreses the number of items in the block by the given count. + /// </summary> + /// <remarks> + /// The <paramref name="count"/> parameter is used when one object + /// can represent many items, like a batch in <see cref="BatchBlock{T}"/>. + /// </remarks> public void DecreaseCount (int count = 1) { int decreased = Interlocked.Add (ref itemCount, -count); @@ -164,10 +174,22 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// The number of messages that were postponed + /// and can be attempted to be consumed. + /// </summary> public int PostponedMessagesCount { get { return postponedMessages.Count; } } + /// <summary> + /// Reserves a message from those that were postponed. + /// Does not guarantee any order of the messages being reserved. + /// </summary> + /// <returns> + /// An object representing the reservation on success, + /// <c>null</c> on failure. + /// </returns> public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage() { while (!postponedMessages.IsEmpty) { @@ -193,11 +215,17 @@ namespace System.Threading.Tasks.Dataflow { return null; } + /// <summary> + /// Releases the given reservation. + /// </summary> public void RelaseReservation(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation) { reservation.Item1.ReleaseReservation (reservation.Item2, Target); } + /// <summary> + /// Consumes previously reserved item. + /// </summary> public TInput ConsumeReserved(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation) { bool consumed; @@ -205,6 +233,11 @@ namespace System.Threading.Tasks.Dataflow { reservation.Item2, Target, out consumed); } + /// <summary> + /// Makes sure retrieving items that were postponed, + /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>, + /// is currently running. + /// </summary> void EnsurePostponedProcessing () { if (postponedProcessing.TrySet()) @@ -212,6 +245,10 @@ namespace System.Threading.Tasks.Dataflow { TaskCreationOptions.PreferFairness, options.TaskScheduler); } + /// <summary> + /// Retrieves items that were postponed, + /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>. + /// </summary> void RetrievePostponed () { // BoundedCapacity can't be -1 here, because in that case there would be no postponing @@ -261,6 +298,10 @@ namespace System.Threading.Tasks.Dataflow { /// <param name="newItem">Was new item just added?</param> protected abstract void EnsureProcessing (bool newItem); + /// <summary> + /// Completes the box, no new messages will be accepted. + /// Also starts the process of completing the output queue. + /// </summary> public void Complete () { // Make message queue complete diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs index 64724944281..e82d1306b4a 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs @@ -1,6 +1,7 @@ // ObservableDataflowBlock.cs // // Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -19,22 +20,14 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// -// - -using System; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Collections.Concurrent; - -namespace System.Threading.Tasks.Dataflow -{ - internal class ObservableDataflowBlock<TSource> : IObservable<TSource> - { - class ObserverWrapper : ITargetBlock<TSource> - { - IObserver<TSource> observer; +namespace System.Threading.Tasks.Dataflow { + /// <summary> + /// Rx Observable that represents a source block. + /// </summary> + class ObservableDataflowBlock<TSource> : IObservable<TSource> { + class ObserverWrapper : ITargetBlock<TSource> { + readonly IObserver<TSource> observer; public ObserverWrapper (IObserver<TSource> observer) { @@ -52,15 +45,12 @@ namespace System.Threading.Tasks.Dataflow } public Task Completion { - get { - return null; - } + get { return null; } } - public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader, - TSource messageValue, - ISourceBlock<TSource> source, - bool consumeToAccept) + public DataflowMessageStatus OfferMessage ( + DataflowMessageHeader messageHeader, TSource messageValue, + ISourceBlock<TSource> source, bool consumeToAccept) { if (consumeToAccept) { if (!source.ReserveMessage (messageHeader, this)) @@ -77,7 +67,7 @@ namespace System.Threading.Tasks.Dataflow } } - ISourceBlock<TSource> source; + readonly ISourceBlock<TSource> source; public ObservableDataflowBlock (ISourceBlock<TSource> source) { @@ -86,9 +76,8 @@ namespace System.Threading.Tasks.Dataflow public IDisposable Subscribe (IObserver<TSource> observer) { - ObserverWrapper wrapper = new ObserverWrapper (observer); + var wrapper = new ObserverWrapper (observer); return source.LinkTo (wrapper); } } -} - +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs index f198dc51c38..55f3b6e8e4f 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs @@ -1,6 +1,7 @@ // ObserverDataflowBlock.cs // // Copyright (c) 2011 Jérémie "garuma" Laval +// Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -19,20 +20,13 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// -// - -using System; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Collections.Concurrent; - -namespace System.Threading.Tasks.Dataflow -{ - internal class ObserverDataflowBlock<TInput> : IObserver<TInput> - { - ITargetBlock<TInput> target; +namespace System.Threading.Tasks.Dataflow { + /// <summary> + /// Rx Observer that represents a target block. + /// </summary> + class ObserverDataflowBlock<TInput> : IObserver<TInput> { + readonly ITargetBlock<TInput> target; public ObserverDataflowBlock (ITargetBlock<TInput> target) { @@ -54,5 +48,4 @@ namespace System.Threading.Tasks.Dataflow target.Post (value); } } -} - +}
\ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs index 2740df5245a..aa273ab2f16 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueue.cs @@ -49,6 +49,9 @@ namespace System.Threading.Tasks.Dataflow { this.countSelector = countSelector; } + /// <summary> + /// Calculates the count of items in the given object. + /// </summary> protected override int GetModifiedCount(T data) { if (countSelector == null) @@ -57,6 +60,9 @@ namespace System.Threading.Tasks.Dataflow { return countSelector (data); } + /// <summary> + /// Sends messages to targets. + /// </summary> protected override void Process () { bool processed; @@ -194,6 +200,9 @@ namespace System.Threading.Tasks.Dataflow { EnsureProcessing (); } + /// <summary> + /// Notifies that the first item in the queue changed. + /// </summary> void FirstItemChanged () { T firstItem; diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs index 1c96728a0ba..b1c1c294870 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs @@ -25,7 +25,7 @@ using System.Collections.Concurrent; namespace System.Threading.Tasks.Dataflow { /// <summary> - /// This class handles outgoing message that get queued when there is no + /// Handles outgoing messages that get queued when there is no /// block on the other end to proces it. It also allows receive operations. /// </summary> abstract class OutgoingQueueBase<T> { @@ -54,19 +54,35 @@ namespace System.Threading.Tasks.Dataflow { this.decreaseItemsCount = decreaseItemsCount; } + /// <summary> + /// Is the queue completed? + /// Queue is completed after <see cref="Complete"/> is called + /// and all items are retrieved from it. + /// </summary> public bool IsCompleted { get { return Outgoing.IsCompleted; } } + /// <summary> + /// Current number of items in the queue. + /// Item are counted the way <see cref="DataflowBlockOptions.BoundedCapacity"/> + /// counts them, e.g. each item in a batch counts, even if batch is a single object. + /// </summary> public int Count { get { return totalModifiedCount; } } + /// <summary> + /// Calculates the count of items in the given object. + /// </summary> protected virtual int GetModifiedCount (T data) { return 1; } + /// <summary> + /// Adds an object to the queue. + /// </summary> public void AddData (T data) { try { @@ -79,6 +95,9 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Makes sure sending messages to targets is running. + /// </summary> protected void EnsureProcessing () { ForceProcessing = true; @@ -87,13 +106,25 @@ namespace System.Threading.Tasks.Dataflow { TaskCreationOptions.PreferFairness, options.TaskScheduler); } + /// <summary> + /// Indicates whether sending messages should be forced to start. + /// </summary> protected bool ForceProcessing { get { return forceProcessing; } set { forceProcessing = value; } } + /// <summary> + /// Sends messages to targets. + /// </summary> protected abstract void Process (); + /// <summary> + /// Adds a target block to send messages to. + /// </summary> + /// <returns> + /// An object that can be used to destroy the link to the added target. + /// </returns> public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions linkOptions) { if (targetBlock == null) @@ -106,16 +137,26 @@ namespace System.Threading.Tasks.Dataflow { return result; } + /// <summary> + /// Makes sure the block is completed if it should be. + /// </summary> protected void VerifyCompleteness () { if (Outgoing.IsCompleted && externalCompleteTester ()) compHelper.Complete (); } + /// <summary> + /// Is the block faulted or cancelled? + /// </summary> protected bool IsFaultedOrCancelled { get { return compHelper.Completion.IsFaulted || compHelper.Completion.IsCanceled; } } + /// <summary> + /// Used to notify that object was removed from the queue + /// and to update counts. + /// </summary> protected void DecreaseCounts (T data) { var modifiedCount = GetModifiedCount (data); @@ -124,6 +165,9 @@ namespace System.Threading.Tasks.Dataflow { decreaseItemsCount (modifiedCount); } + /// <summary> + /// Marks the queue for completion. + /// </summary> public void Complete () { Outgoing.CompleteAdding (); diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs index e8b184dea3f..b53be90dcc1 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs @@ -23,8 +23,8 @@ namespace System.Threading.Tasks.Dataflow { /// <summary> - /// This internal block is used by the OutputAvailable methods in DataflowBlock static class - /// to check for available items in an asynchrnousy way + /// This internal block is used by the <see cref="DataflowBlock.OutputAvailableAsync"/> methods + /// to check for available items in an asynchronous way. /// </summary> class OutputAvailableBlock<TOutput> : ITargetBlock<TOutput> { readonly TaskCompletionSource<bool> completion = @@ -48,6 +48,11 @@ namespace System.Threading.Tasks.Dataflow { return DataflowMessageStatus.DecliningPermanently; } + /// <summary> + /// Returns a Task that can be used to wait until output from a block is available. + /// </summary> + /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param> + /// <param name="token">Cancellation token for this operation.</param> public Task<bool> AsyncGet (IDisposable bridge, CancellationToken token) { linkBridge = bridge; @@ -60,6 +65,10 @@ namespace System.Threading.Tasks.Dataflow { return completion.Task; } + /// <summary> + /// Called after the result has been set, + /// cleans up after this block. + /// </summary> void CompletionSet () { if (linkBridge != null) { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs index 7c148718a35..962a0baf387 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs @@ -23,10 +23,11 @@ using System.Collections.Concurrent; -namespace System.Threading.Tasks.Dataflow -{ - internal class PassingMessageBox<TInput> : MessageBox<TInput> - { +namespace System.Threading.Tasks.Dataflow { + /// <summary> + /// Message box for blocks that don't need any special processing of incoming items. + /// </summary> + class PassingMessageBox<TInput> : MessageBox<TInput> { readonly Action<bool> processQueue; public PassingMessageBox ( @@ -40,6 +41,11 @@ namespace System.Threading.Tasks.Dataflow this.processQueue = processQueue; } + /// <summary> + /// Makes sure the input queue is processed the way it needs to. + /// Executes synchronously, so shouldn't cause any long processing. + /// </summary> + /// <param name="newItem">Was new item just added?</param> protected override void EnsureProcessing (bool newItem) { processQueue (newItem); diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs index 9e4bd1dced1..42b45d16b6c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs @@ -22,6 +22,9 @@ // THE SOFTWARE. namespace System.Threading.Tasks.Dataflow { + /// <summary> + /// Block returned by <see cref="DataflowBlock.Encapsulate{TInput,TOutput}"/>. + /// </summary> class PropagatorWrapperBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput> { readonly ITargetBlock<TInput> targetBlock; diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs index c3820992a5d..e9ea6fe295c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs @@ -23,8 +23,8 @@ namespace System.Threading.Tasks.Dataflow { /// <summary> - /// This internal block is used by the Receive methods in DataflowBlock static class - /// to retrieve elements in either blocking or asynchronous way + /// This internal block is used by the <see cref="DataflowBlock.Receive"/> methods + /// to retrieve elements in either blocking or asynchronous way. /// </summary> class ReceiveBlock<TOutput> : ITargetBlock<TOutput> { readonly TaskCompletionSource<TOutput> completion = @@ -63,6 +63,12 @@ namespace System.Threading.Tasks.Dataflow { return DataflowMessageStatus.Accepted; } + /// <summary> + /// Synchronously waits until an item is available. + /// </summary> + /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param> + /// <param name="token">Cancellation token for this operation.</param> + /// <param name="timeout">Timeout of this operation, in milliseconds.</param> public TOutput WaitAndGet (IDisposable bridge, CancellationToken token, int timeout) { try { @@ -75,6 +81,12 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Asynchronously waits until an item is available. + /// </summary> + /// <param name="bridge">The disposable object returned by <see cref="ISourceBlock{TOutput}.LinkTo"/>.</param> + /// <param name="token">Cancellation token for this operation.</param> + /// <param name="timeout">Timeout of this operation, in milliseconds.</param> public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, int timeout) { linkBridge = bridge; @@ -98,6 +110,10 @@ namespace System.Threading.Tasks.Dataflow { return completion.Task; } + /// <summary> + /// Called after the result has been set, + /// cleans up after this block. + /// </summary> void CompletionSet () { if (linkBridge != null) { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs index 78f6c9cf88f..ec9b08745c4 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs @@ -46,6 +46,10 @@ namespace System.Threading.Tasks.Dataflow { this.cancellationToken = cancellationToken; } + /// <summary> + /// Sends the item given in the constructor to the target block. + /// </summary> + /// <returns>Task that completes when the sending is done, or can't be performed.</returns> public Task<bool> Send () { cancellationTokenRegistration = cancellationToken.Register ( @@ -60,6 +64,9 @@ namespace System.Threading.Tasks.Dataflow { return taskCompletionSource.Task; } + /// <summary> + /// Offers the item to the target and hadles its response. + /// </summary> void PerformSend () { DisableCancel (); @@ -148,11 +155,19 @@ namespace System.Threading.Tasks.Dataflow { return false; } + /// <summary> + /// Temporarily disables cancelling. + /// </summary> void DisableCancel () { cancelDisabled = true; } + /// <summary> + /// Enables cancelling after it was disabled. + /// If cancellation was attempted in the meantime, + /// actually performs the cancelling. + /// </summary> void EnableCancel () { cancelDisabled = false; @@ -161,6 +176,9 @@ namespace System.Threading.Tasks.Dataflow { taskCompletionSource.SetCanceled (); } + /// <summary> + /// Sets the result of the operation. + /// </summary> void SetResult (bool result) { cancellationTokenRegistration.Dispose (); diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs index 0c01bc6c57b..8b49b461d1c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetCollection.cs @@ -25,7 +25,7 @@ using System.Collections.Generic; namespace System.Threading.Tasks.Dataflow { /// <summary> - /// Collection of target blocks for a source block. + /// Base class for collection of target blocks for a source block. /// Also handles sending messages to the target blocks. /// </summary> abstract class TargetCollectionBase<T> { @@ -52,6 +52,10 @@ namespace System.Threading.Tasks.Dataflow { Reserved = new AtomicBoolean (); } + /// <summary> + /// Is called after a message was sent, makes sure the linked is destroyed after + /// <see cref="DataflowLinkOptions.MaxMessages"/> were sent. + /// </summary> public void MessageSent() { if (remainingMessages != -1) @@ -61,11 +65,17 @@ namespace System.Threading.Tasks.Dataflow { } readonly AtomicBoolean disabled = new AtomicBoolean (); + /// <summary> + /// Is the link destroyed? + /// </summary> public bool Disabled { get { return disabled.Value; } } + /// <summary> + /// Destroys the link to this target. + /// </summary> public void Dispose () { disabled.Value = true; @@ -74,16 +84,22 @@ namespace System.Threading.Tasks.Dataflow { cancellationTokenSource.Cancel (); Target ignored; - targetCollection.targetDictionary.TryRemove (TargetBlock, out ignored); + targetCollection.TargetDictionary.TryRemove (TargetBlock, out ignored); // to avoid memory leak; it could take a long time // before this object is actually removed from the collection TargetBlock = null; } + /// <summary> + /// Does this target have a postponed message? + /// </summary> public AtomicBoolean Postponed { get; private set; } - // used only by broadcast blocks + /// <summary> + /// Does this target have a reserved message? + /// </summary> + /// <remarks>Used only by broadcast blocks.</remarks> public AtomicBoolean Reserved { get; private set; } } @@ -95,7 +111,7 @@ namespace System.Threading.Tasks.Dataflow { readonly ConcurrentQueue<Target> appendQueue = new ConcurrentQueue<Target> (); readonly LinkedList<Target> targets = new LinkedList<Target> (); - protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> targetDictionary = + protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> TargetDictionary = new ConcurrentDictionary<ITargetBlock<T>, Target> (); // lastMessageHeaderId will be always accessed only from one thread @@ -106,13 +122,19 @@ namespace System.Threading.Tasks.Dataflow { bool firstOffering; T currentItem; - public TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept) + protected TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept) { this.block = block; this.broadcast = broadcast; this.consumeToAccept = consumeToAccept; } + /// <summary> + /// Adds a target block to send messages to. + /// </summary> + /// <returns> + /// An object that can be used to destroy the link to the added target. + /// </returns> public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions options) { CancellationTokenSource cancellationTokenSource = null; @@ -129,7 +151,7 @@ namespace System.Threading.Tasks.Dataflow { var target = new Target ( this, targetBlock, options.MaxMessages, cancellationTokenSource); - targetDictionary [targetBlock] = target; + TargetDictionary [targetBlock] = target; if (options.Append) appendQueue.Enqueue (target); else @@ -138,6 +160,9 @@ namespace System.Threading.Tasks.Dataflow { return target; } + /// <summary> + /// Sets the current item to be offered to targets + /// </summary> public void SetCurrentItem (T item) { firstOffering = true; @@ -147,18 +172,32 @@ namespace System.Threading.Tasks.Dataflow { ClearUnpostponed (); } + /// <summary> + /// Clears the collection of "unpostponed" targets. + /// </summary> protected abstract void ClearUnpostponed (); + /// <summary> + /// Resets the current item to be offered to targets. + /// This means there is currently nothing to offer. + /// </summary> public void ResetCurrentItem () { currentItem = default(T); Thread.VolatileWrite (ref currentMessageHeaderId, 0); } + /// <summary> + /// Is there an item to send right now? + /// </summary> public bool HasCurrentItem { get { return Thread.VolatileRead (ref currentMessageHeaderId) != 0; } } + /// <summary> + /// Offers the current item to all eligible targets. + /// </summary> + /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns> public bool OfferItemToTargets () { // is there an item to offer? @@ -169,7 +208,7 @@ namespace System.Threading.Tasks.Dataflow { do { // order is important here, we want to make sure that prepended target - // added after appended target is always processed first + // added before appended target is processed first var appended = PrependOrAppend (false); var prepended = PrependOrAppend (true); @@ -192,6 +231,9 @@ namespace System.Threading.Tasks.Dataflow { return false; } + /// <summary> + /// Are there any targets that currently require a message to be sent to them? + /// </summary> public bool NeedsProcessing { get { return !appendQueue.IsEmpty || !prependQueue.IsEmpty @@ -199,8 +241,19 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Is the collection of unpostponed targets empty? + /// </summary> protected abstract bool UnpostponedIsEmpty { get; } + /// <summary> + /// Prepends (appends) targets that should be prepended (appended) to the collection of targets. + /// </summary> + /// <param name="prepend"><c>true</c> to prepend, <c>false</c> to append.</param> + /// <returns> + /// Nodes that contain first and last target added to the list, + /// or <c>null</c> if no nodes were added. + /// </returns> Tuple<LinkedListNode<Target>, LinkedListNode<Target>> PrependOrAppend ( bool prepend) { @@ -227,6 +280,10 @@ namespace System.Threading.Tasks.Dataflow { : Tuple.Create (first, last); } + /// <summary> + /// Offers the current item to the targets between the given nodes (inclusive). + /// </summary> + /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns> bool OfferItemToTargets ( Tuple<LinkedListNode<Target>, LinkedListNode<Target>> targetPair) { @@ -252,8 +309,16 @@ namespace System.Threading.Tasks.Dataflow { return false; } + /// <summary> + /// Offers the current item to unpostponed targets. + /// </summary> + /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns> protected abstract bool OfferItemToUnpostponed (); + /// <summary> + /// Offers the current item to the given target. + /// </summary> + /// <returns>Was the item accepted?</returns> protected bool OfferItem (Target target) { if (target.Reserved.Value) @@ -282,12 +347,18 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Returns whether the given header corresponds to the current item. + /// </summary> public bool VerifyHeader (DataflowMessageHeader header) { return header.Id == Thread.VolatileRead (ref currentMessageHeaderId); } } + /// <summary> + /// Target collection for non-broadcast blocks. + /// </summary> class TargetCollection<T> : TargetCollectionBase<T> { readonly ConcurrentQueue<Target> unpostponedTargets = new ConcurrentQueue<Target> (); @@ -297,20 +368,32 @@ namespace System.Threading.Tasks.Dataflow { { } + /// <summary> + /// Is the collection of unpostponed targets empty? + /// </summary> protected override bool UnpostponedIsEmpty { get { return unpostponedTargets.IsEmpty; } } + /// <summary> + /// Returns whether the given header corresponds to the current item + /// and that the given target block postponed this item. + /// </summary> public bool VerifyHeader (DataflowMessageHeader header, ITargetBlock<T> targetBlock) { return VerifyHeader (header) - && targetDictionary[targetBlock].Postponed.Value; + && TargetDictionary[targetBlock].Postponed.Value; } + /// <summary> + /// Unpostpones the given target. + /// </summary> + /// <param name="targetBlock">Target to unpostpone.</param> + /// <param name="messageConsumed">Did the target consume an item?</param> public void UnpostponeTarget (ITargetBlock<T> targetBlock, bool messageConsumed) { Target target; - if (!targetDictionary.TryGetValue (targetBlock, out target)) + if (!TargetDictionary.TryGetValue (targetBlock, out target)) return; if (messageConsumed) @@ -320,6 +403,9 @@ namespace System.Threading.Tasks.Dataflow { target.Postponed.Value = false; } + /// <summary> + /// Clears the collection of "unpostponed" targets. + /// </summary> protected override void ClearUnpostponed () { Target ignored; @@ -327,6 +413,10 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Offers the current item to unpostponed targets. + /// </summary> + /// <returns>Was the item accepted?</returns> protected override bool OfferItemToUnpostponed () { Target target; @@ -339,6 +429,9 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Target collection for broadcast blocks. + /// </summary> class BroadcastTargetCollection<T> : TargetCollectionBase<T> { // it's necessary to store the headers because of a race between // UnpostponeTargetConsumed and SetCurrentItem @@ -351,19 +444,30 @@ namespace System.Threading.Tasks.Dataflow { { } + /// <summary> + /// Is the collection of unpostponed targets empty? + /// </summary> protected override bool UnpostponedIsEmpty { get { return unpostponedTargets.IsEmpty; } } + /// <summary> + /// Marks the target as having a reserved message. + /// </summary> public void ReserveTarget (ITargetBlock<T> targetBlock) { - targetDictionary [targetBlock].Reserved.Value = true; + TargetDictionary [targetBlock].Reserved.Value = true; } + /// <summary> + /// Unpostpone target after it consumed a message. + /// </summary> + /// <param name="targetBlock">The target to unpostpone.</param> + /// <param name="header">Header of the message the target consumed.</param> public void UnpostponeTargetConsumed (ITargetBlock<T> targetBlock, DataflowMessageHeader header) { - Target target = targetDictionary [targetBlock]; + Target target = TargetDictionary [targetBlock]; target.MessageSent (); unpostponedTargets.Enqueue (Tuple.Create (target, header)); @@ -372,10 +476,13 @@ namespace System.Threading.Tasks.Dataflow { target.Reserved.Value = false; } + /// <summary> + /// Unpostpone target in the case when it didn't successfuly consume a message. + /// </summary> public void UnpostponeTargetNotConsumed (ITargetBlock<T> targetBlock) { Target target; - if (!targetDictionary.TryGetValue (targetBlock, out target)) + if (!TargetDictionary.TryGetValue (targetBlock, out target)) return; unpostponedTargets.Enqueue (Tuple.Create (target, @@ -385,6 +492,9 @@ namespace System.Threading.Tasks.Dataflow { target.Reserved.Value = false; } + /// <summary> + /// Clears the collection of "unpostponed" targets. + /// </summary> protected override void ClearUnpostponed () { Tuple<Target, DataflowMessageHeader> ignored; @@ -392,6 +502,10 @@ namespace System.Threading.Tasks.Dataflow { } } + /// <summary> + /// Offers the current item to unpostponed targets. + /// </summary> + /// <returns>Always <c>false</c>.</returns> protected override bool OfferItemToUnpostponed () { Tuple<Target, DataflowMessageHeader> tuple; diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs index 7349a084042..a1c37f4dce3 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs @@ -129,6 +129,10 @@ namespace System.Threading.Tasks.Dataflow { return outgoing.TryReceiveAll (out items); } + /// <summary> + /// Transforms one item from the queue if the transform delegate is synchronous. + /// </summary> + /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns> bool TransformProcess () { TInput input; @@ -140,6 +144,11 @@ namespace System.Threading.Tasks.Dataflow { return dequeued; } + /// <summary> + /// Processes one item from the queue if the transform delegate is asynchronous. + /// </summary> + /// <param name="task">The Task that was returned by the synchronous part of the delegate.</param> + /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns> bool AsyncTransformProcess (out Task<TOutput> task) { TInput input; @@ -153,6 +162,9 @@ namespace System.Threading.Tasks.Dataflow { return dequeued; } + /// <summary> + /// Process result of finished asynchronous transformation. + /// </summary> void AsyncProcessFinishedTask (Task<TOutput> task) { if (task == null || task.IsCanceled) @@ -172,21 +184,15 @@ namespace System.Threading.Tasks.Dataflow { } public Task Completion { - get { - return compHelper.Completion; - } + get { return compHelper.Completion; } } public int OutputCount { - get { - return outgoing.Count; - } + get { return outgoing.Count; } } public int InputCount { - get { - return messageQueue.Count; - } + get { return messageQueue.Count; } } public override string ToString () diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs index 194d228f569..7bb465728a5 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs @@ -127,6 +127,10 @@ namespace System.Threading.Tasks.Dataflow { return outgoing.TryReceiveAll (out items); } + /// <summary> + /// Transforms one item from the queue if the transform delegate is synchronous. + /// </summary> + /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue is empty.</returns> bool TransformProcess () { TInput input; @@ -141,6 +145,9 @@ namespace System.Threading.Tasks.Dataflow { return dequeued; } + /// <summary> + /// Adds the transformed collection to the output queue. + /// </summary> void EnqueueTransformed (IEnumerable<TOutput> transformed) { bool first = true; @@ -157,6 +164,11 @@ namespace System.Threading.Tasks.Dataflow { messageBox.DecreaseCount (); } + /// <summary> + /// Processes one item from the queue if the transform delegate is asynchronous. + /// </summary> + /// <param name="task">The Task that was returned by the synchronous part of the delegate.</param> + /// <returns>Returns whether an item was processed. Returns <c>false</c> if the queue was empty.</returns> bool AsyncTransformProcess (out Task<IEnumerable<TOutput>> task) { TInput input; @@ -170,6 +182,9 @@ namespace System.Threading.Tasks.Dataflow { return dequeued; } + /// <summary> + /// Process result of finished asynchronous transformation. + /// </summary> void ProcessFinishedTask (Task<IEnumerable<TOutput>> task) { if (task == null || task.IsCanceled) @@ -189,21 +204,15 @@ namespace System.Threading.Tasks.Dataflow { } public Task Completion { - get { - return compHelper.Completion; - } + get { return compHelper.Completion; } } public int OutputCount { - get { - return outgoing.Count; - } + get { return outgoing.Count; } } public int InputCount { - get { - return messageQueue.Count; - } + get { return messageQueue.Count; } } public override string ToString () diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs index 0711bed8045..6da70460598 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs @@ -116,6 +116,9 @@ namespace System.Threading.Tasks.Dataflow { return true; } + /// <summary> + /// Moves an item from the input queue to the output queue. + /// </summary> void BroadcastProcess () { T item; |