Welcome to mirror list, hosted at ThFree Co, Russian Federation.

Sorting.cs « Utils « Parallel « Linq « System « System.Core « referencesource « class « mcs - github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: dba1de364e8cd5405a06da297946ee58ac14021c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// Sorting.cs
//
// <OWNER>Microsoft</OWNER>
//
// Support for sorting.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics.Contracts;

namespace System.Linq.Parallel
{

    //---------------------------------------------------------------------------------------
    // The sort helper abstraction hides the implementation of our parallel merge sort.  See
    // comments below for more details.  In summary, there will be one sort helper per
    // partition.  Each will, in parallel read the whole key/value set from its input,
    // perform a local sort on this data, and then cooperatively merge with other concurrent
    // tasks to generate a single sorted output.  The local sort step is done using a simple
    // quick-sort algorithm.  Then we use a log(p) reduction to perform merges in parallel;
    // during each round of merges, half of the threads will stop doing work and may return.
    // At the end, one thread will remain and it holds the final sorted output.
    //

    internal abstract class SortHelper<TInputOutput>
    {
        internal abstract TInputOutput[] Sort();
    }

    internal class SortHelper<TInputOutput, TKey> : SortHelper<TInputOutput>, IDisposable
    {

        private QueryOperatorEnumerator<TInputOutput, TKey> m_source; // The data source from which to pull data.
        private int m_partitionCount; // The partition count.
        private int m_partitionIndex; // This helper's index.

        // This data is shared among all partitions.
        private QueryTaskGroupState m_groupState; // To communicate status, e.g. cancellation.
        private int[][] m_sharedIndices; // Shared set of indices used during sorting.
        private GrowingArray<TKey>[] m_sharedKeys; // Shared keys with which to compare elements.
        private TInputOutput[][] m_sharedValues; // The actual values used for comparisons.
        private Barrier[,] m_sharedBarriers; // A matrix of barriers used for synchronizing during merges.
        private OrdinalIndexState m_indexState; // State of the order index
        private IComparer<TKey> m_keyComparer; // Comparer for the order keys

        //---------------------------------------------------------------------------------------
        // Creates a single sort helper object.  This is marked private to ensure the only
        // snippet of code that creates one is the factory, since creating many implies some
        // implementation detail in terms of dependencies which other places in the codebase
        // shouldn't need to worry about.
        //

        private SortHelper(QueryOperatorEnumerator<TInputOutput, TKey> source, int partitionCount, int partitionIndex,
            QueryTaskGroupState groupState, int[][] sharedIndices,
            OrdinalIndexState indexState, IComparer<TKey> keyComparer,
            GrowingArray<TKey>[] sharedkeys, TInputOutput[][] sharedValues, Barrier[,] sharedBarriers)
        {
            Contract.Assert(source != null);
            Contract.Assert(groupState != null);
            Contract.Assert(sharedIndices != null);
            Contract.Assert(sharedkeys != null);
            Contract.Assert(sharedValues != null);
            Contract.Assert(sharedBarriers != null);
            Contract.Assert(groupState.CancellationState.MergedCancellationToken != null);
            Contract.Assert(sharedIndices.Length <= sharedkeys.Length);
            Contract.Assert(sharedIndices.Length == sharedValues.Length);
            Contract.Assert(sharedIndices.Length == sharedBarriers.GetLength(1));
            Contract.Assert(groupState.CancellationState.MergedCancellationToken != null);

            m_source = source;
            m_partitionCount = partitionCount;
            m_partitionIndex = partitionIndex;
            m_groupState = groupState;
            m_sharedIndices = sharedIndices;
            m_indexState = indexState;
            m_keyComparer = keyComparer;
            m_sharedKeys = sharedkeys;
            m_sharedValues = sharedValues;
            m_sharedBarriers = sharedBarriers;

            Contract.Assert(m_sharedKeys.Length >= m_sharedValues.Length);
        }

        //---------------------------------------------------------------------------------------
        // Factory method to create a bunch of sort helpers that are all related.  Once created,
        // these helpers must all run concurrently with one another.
        //
        // Arguments:
        //     partitions    - the input data partitions to be sorted
        //     groupState    - common state used for communication (e.g. cancellation)
        //
        // Return Value:
        //     An array of helpers, one for each partition.
        //

        internal static SortHelper<TInputOutput, TKey>[] GenerateSortHelpers(
            PartitionedStream<TInputOutput, TKey> partitions, QueryTaskGroupState groupState)
        {
            int degreeOfParallelism = partitions.PartitionCount;
            SortHelper<TInputOutput, TKey>[] helpers = new SortHelper<TInputOutput, TKey>[degreeOfParallelism];

            // Calculate the next highest power of two greater than or equal to the DOP.
            // Also, calculate phaseCount = log2(degreeOfParallelismPow2)
            int degreeOfParallelismPow2 = 1, phaseCount = 0;
            while (degreeOfParallelismPow2 < degreeOfParallelism)
            {
                phaseCount++;
                degreeOfParallelismPow2 <<= 1;
            }

            // Initialize shared objects used during sorting.
            int[][] sharedIndices = new int[degreeOfParallelism][];
            GrowingArray<TKey>[] sharedKeys = new GrowingArray<TKey>[degreeOfParallelism];
            TInputOutput[][] sharedValues = new TInputOutput[degreeOfParallelism][];
            Barrier[,] sharedBarriers = new Barrier[phaseCount, degreeOfParallelism];

            if (degreeOfParallelism > 1)
            {
                // Initialize the barriers we need.  Due to the logarithmic reduction, we don't
                // need to populate the whole matrix.
                int offset = 1;
                for (int i = 0; i < sharedBarriers.GetLength(0); i++)
                {
                    for (int j = 0; j < sharedBarriers.GetLength(1); j++)
                    {
                        // As the phases increase, the barriers required become more and more sparse.
                        if ((j % offset) == 0)
                        {
                            sharedBarriers[i, j] = new Barrier(2);
                        }
                    }
                    offset *= 2;
                }
            }

            // Lastly populate the array of sort helpers.
            for (int i = 0; i < degreeOfParallelism; i++)
            {
                helpers[i] = new SortHelper<TInputOutput, TKey>(
                    partitions[i], degreeOfParallelism, i,
                    groupState, sharedIndices,
                    partitions.OrdinalIndexState, partitions.KeyComparer,
                    sharedKeys, sharedValues, sharedBarriers);
            }

            return helpers;
        }

        //---------------------------------------------------------------------------------------
        // Disposes of this sort helper's expensive state.
        //

        public void Dispose()
        {
            // We only dispose of the barriers when the 1st partition finishes.  That's because
            // all others depend on the shared barriers, so we can't get rid of them eagerly.
            if (m_partitionIndex == 0)
            {
                for (int i = 0; i < m_sharedBarriers.GetLength(0); i++)
                {
                    for (int j = 0; j < m_sharedBarriers.GetLength(1); j++)
                    {
                        Barrier b = m_sharedBarriers[i, j];
                        if (b != null)
                        {
                            b.Dispose();
                        }
                    }
                }
            }
        }

        //---------------------------------------------------------------------------------------
        // Sorts the data, possibly returning a result.
        //
        // Notes:
        //     This method makes some pretty fundamental assumptions about what concurrency
        //     exists in the system.  Namely, it assumes all SortHelpers are running in
        //     parallel.  If they aren't Sort will end up waiting for certain events that
        //     will never happen -- i.e. we will deadlock.
        //

        internal override TInputOutput[] Sort()
        {
            // Step 1.  Accumulate this partitions' worth of input.
            GrowingArray<TKey> sourceKeys = null;
            List<TInputOutput> sourceValues = null;

            BuildKeysFromSource(ref sourceKeys, ref sourceValues);

            Contract.Assert(sourceValues != null, "values weren't populated");
            Contract.Assert(sourceKeys != null, "keys weren't populated");

            // Step 2.  Locally sort this partition's key indices in-place.
            QuickSortIndicesInPlace(sourceKeys, sourceValues, m_indexState);

            // Step 3. Enter into the merging phases, each separated by several barriers.
            if (m_partitionCount > 1)
            {
                // We only need to merge if there is more than 1 partition.
                MergeSortCooperatively();
            }

            return m_sharedValues[m_partitionIndex];
        }

        //-----------------------------------------------------------------------------------
        // Generates a list of values and keys from the data source.  After calling this,
        // the keys and values lists will be populated; each key at index i corresponds to
        // the value at index i in the other list.
        //
        // Notes:
        //    Should only be called once per sort helper.
        //

        private void BuildKeysFromSource(ref GrowingArray<TKey> keys, ref List<TInputOutput> values)
        {
            values = new List<TInputOutput>();

            // Enumerate the whole input set, generating a key set in the process.
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
            try
            {
                TInputOutput current = default(TInputOutput);
                TKey currentKey = default(TKey);
                bool hadNext = m_source.MoveNext(ref current, ref currentKey);

                if (keys == null)
                {
                    keys = new GrowingArray<TKey>();
                }

                if (hadNext)
                {
                    int i = 0;
                    do
                    {
                        if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(cancelToken);

                        // Accumulate the keys and values so that we can sort them in a moment.
                        keys.Add(currentKey);
                        values.Add(current);
                    }
                    while (m_source.MoveNext(ref current, ref currentKey));
                }
            }
            finally
            {
                m_source.Dispose();
            }
        }

        //-----------------------------------------------------------------------------------
        // Produces a list of indices and sorts them in place using a local sort.
        //
        // Notes:
        //     Each element in the indices array is an index which refers to an element in
        //     the key/value array.  After calling this routine, the indices will be ordered
        //     such that the keys they refere to are in ascending or descending order,
        //     according to the sort criteria used.
        //

        private void QuickSortIndicesInPlace(GrowingArray<TKey> keys, List<TInputOutput> values, OrdinalIndexState ordinalIndexState)
        {
            Contract.Assert(keys != null);
            Contract.Assert(values != null);
            Contract.Assert(keys.Count == values.Count);

            // Generate a list of keys in forward order.  We will sort them in a moment.
            int[] indices = new int[values.Count];
            for (int i = 0; i < indices.Length; i++)
            {
                indices[i] = i;
            }

            // Now sort the indices in place.
            if (indices.Length > 1
                && ordinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
            {
                QuickSort(0, indices.Length - 1, keys.InternalArray, indices, m_groupState.CancellationState.MergedCancellationToken);
            }

            if (m_partitionCount == 1)
            {
                // If there is only one partition, we will produce the final value set now,
                // since there will be no merge afterward (which is where we usually do this).
                TInputOutput[] sortedValues = new TInputOutput[values.Count];
                for (int i = 0; i < indices.Length; i++)
                {
                    sortedValues[i] = values[indices[i]];
                }
                m_sharedValues[m_partitionIndex] = sortedValues;
            }
            else
            {
                // Otherwise, a merge will happen.  Generate the shared data structures.
                m_sharedIndices[m_partitionIndex] = indices;
                m_sharedKeys[m_partitionIndex] = keys;
                m_sharedValues[m_partitionIndex] = new TInputOutput[values.Count];

                // Copy local structures to shared space.
                values.CopyTo(m_sharedValues[m_partitionIndex]);
            }
        }

        //-----------------------------------------------------------------------------------
        // Works cooperatively with other concurrent sort helpers to produce a final sorted
        // output list of data.  Here is an overview of the algorithm used.
        //
        // During each phase, we must communicate with a partner task.  As a simple
        // illustration, imagine we have 8 partitions (P=8), numbered 0-7.  There will be
        // Log2(O)+2 phases (separated by barriers), where O is the next power of two greater
        // than or equal to P, in the sort operation:
        //
        //     Pairs:   (P = 8)
        //        phase=L:     [0][1] [2][3] [4][5] [6][7]
        //        phase=0:     [0,1]  [2,3]  [4,5]  [6,7]
        //        phase=1:     [0,2]         [4,6]
        //        phase=2:     [0,4]
        //        phase=M:     [0]
        //
        // During phase L, each partition locally sorts its data.  Then, at each subsequent
        // phase in the logarithmic reduction, two partitions are paired together and cooperate
        // to accomplish a portion of the merge.  The left one then goes on to choose another
        // partner, in the next phase, and the right one exits.  And so on, until phase M, when
        // there is just one partition left (the 0th), which is when it may publish the final
        // output from the sort operation.
        //
        // Notice we mentioned rounding up to the next power of two when determining the number
        // of phases.  Values of P which aren't powers of 2 are slightly problematic, because
        // they create a load imbalance in one of the partitions and heighten the depth of the
        // logarithmic tree.  As an illustration, imagine this case:
        //
        //     Pairs:   (P = 5)
        //        phase=L:    [0][1] [2][3] [4]
        //        phase=0:    [0,1]  [2,3]  [4,X]  [X,X]
        //        phase=1:    [0,2]         [4,X]
        //        phase=2:    [0,4]
        //        phase=M:    [0]
        //
        // Partition #4 in this example performs its local sort during phase L, but then has nothing
        // to do during phases 0 and 2.  (I.e. it has nobody to merge with.)  Only during phase 2
        // does it then resume work and help phase 2 perform its merge.  This is modeled a bit like
        // there were actually 8 partitions, which is the next power of two greater than or equal to
        // 5.  This example was chosen as an extreme case of imbalance.  We stall a processor (the 5th)
        // for two complete phases.  If P = 6 or 7, the problem would not be nearly so bad, but if
        // P = 9, the last partition would stall for yet another phase (and so on for every power of
        // two boundary).  We handle these, cases, but note that an overabundance of them will probably
        // negatively impact speedups.
        //

        private void MergeSortCooperatively()
        {
            CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;

            int phaseCount = m_sharedBarriers.GetLength(0);
            for (int phase = 0; phase < phaseCount; phase++)
            {
                bool isLastPhase = (phase == (phaseCount - 1));

                // Calculate our partner for this phase and the next.
                int partnerIndex = ComputePartnerIndex(phase);

                // If we have a partner (see above for non power of 2 cases and why the index returned might
                // be out of bounds), we will coordinate with the partner to produce the merged output.
                if (partnerIndex < m_partitionCount)
                {
                    // Cache references to our local data.
                    int[] myIndices = m_sharedIndices[m_partitionIndex];
                    GrowingArray<TKey> myKeys = m_sharedKeys[m_partitionIndex];
                    TKey[] myKeysArr = myKeys.InternalArray;

                    TInputOutput[] myValues = m_sharedValues[m_partitionIndex];


                    // First we must rendezvous with our merge partner so we know the previous sort
                    // and merge phase has been completed.  By convention, we always use the left-most
                    // partner's barrier for this; all that matters is that both uses the same.
                    m_sharedBarriers[phase, Math.Min(m_partitionIndex, partnerIndex)].SignalAndWait(cancelToken);

                    // Grab the two sorted inputs and then merge them cooperatively into one list.  One
                    // worker merges from left-to-right until it's placed elements up to the half-way
                    // point, and the other worker does the same, but only from right-to-left.
                    if (m_partitionIndex < partnerIndex)
                    {
                        // Before moving on to the actual merge, the left-most partition will allocate data
                        // to hold the merged indices and key/value pairs.

                        // First, remember a copy of all of the partner's lists.
                        int[] rightIndices = m_sharedIndices[partnerIndex];
                        TKey[] rightKeys = m_sharedKeys[partnerIndex].InternalArray;
                        TInputOutput[] rightValues = m_sharedValues[partnerIndex];

                        // We copy the our own items into the right's (overwriting its values) so that it can
                        // retrieve them after the barrier.  This is an exchange operation.
                        m_sharedIndices[partnerIndex] = myIndices;
                        m_sharedKeys[partnerIndex] = myKeys;
                        m_sharedValues[partnerIndex] = myValues;

                        int leftCount = myValues.Length;
                        int rightCount = rightValues.Length;
                        int totalCount = leftCount + rightCount;

                        // Now allocate the lists into which the merged data will go.  Share this
                        // with the other thread so that it can place data into it as well.
                        int[] mergedIndices = null;
                        TInputOutput[] mergedValues = new TInputOutput[totalCount];

                        // Only on the last phase do we need to remember indices and keys.
                        if (!isLastPhase)
                        {
                            mergedIndices = new int[totalCount];
                        }

                        // Publish our newly allocated merged data structures.
                        m_sharedIndices[m_partitionIndex] = mergedIndices;
                        m_sharedKeys[m_partitionIndex] = myKeys;
                        m_sharedValues[m_partitionIndex] = mergedValues;

                        Contract.Assert(myKeysArr != null);

                        m_sharedBarriers[phase, m_partitionIndex].SignalAndWait(cancelToken);

                        // Merge the left half into the shared merged space.  This is a normal merge sort with
                        // the caveat that we stop merging once we reach the half-way point (since our partner
                        // is doing the same for the right half).  Note that during the last phase we only
                        // copy the values and not the indices or keys.
                        int m = (totalCount + 1)/2;
                        int i = 0, j0 = 0, j1 = 0;
                        while (i < m)
                        {
                            if ((i & CancellationState.POLL_INTERVAL) == 0)
                                CancellationState.ThrowIfCanceled(cancelToken);

                            if (j0 < leftCount && (j1 >= rightCount ||
                                                   m_keyComparer.Compare(myKeysArr[myIndices[j0]],
                                                                         rightKeys[rightIndices[j1]]) <= 0))
                            {
                                if (isLastPhase)
                                {
                                    mergedValues[i] = myValues[myIndices[j0]];
                                }
                                else
                                {
                                    mergedIndices[i] = myIndices[j0];
                                }
                                j0++;
                            }
                            else
                            {
                                if (isLastPhase)
                                {
                                    mergedValues[i] = rightValues[rightIndices[j1]];
                                }
                                else
                                {
                                    mergedIndices[i] = leftCount + rightIndices[j1];
                                }
                                j1++;
                            }
                            i++;
                        }

                        // If it's not the last phase, we just bulk propagate the keys and values.
                        if (!isLastPhase && leftCount > 0)
                        {
                            Array.Copy(myValues, 0, mergedValues, 0, leftCount);
                        }

                        // And now just wait for the second half.  We never reuse the same barrier across multiple
                        // phases, so we can always dispose of it when we wake up.
                        m_sharedBarriers[phase, m_partitionIndex].SignalAndWait(cancelToken);
                    }
                    else
                    {
                        // Wait for the other partition to allocate the shared data.
                        m_sharedBarriers[phase, partnerIndex].SignalAndWait(cancelToken);

                        // After the barrier, the other partition will have made two things available to us:
                        // (1) its own indices, keys, and values, stored in the cell that used to hold our data,
                        // and (2) the arrays into which merged data will go, stored in its shared array cells.  
                        // We will snag references to all of these things.
                        int[] leftIndices = m_sharedIndices[m_partitionIndex];
                        TKey[] leftKeys = m_sharedKeys[m_partitionIndex].InternalArray;
                        TInputOutput[] leftValues = m_sharedValues[m_partitionIndex];
                        int[] mergedIndices = m_sharedIndices[partnerIndex];
                        GrowingArray<TKey> mergedKeys = m_sharedKeys[partnerIndex];
                        TInputOutput[] mergedValues = m_sharedValues[partnerIndex];

                        Contract.Assert(leftValues != null);
                        Contract.Assert(leftKeys != null);

                        int leftCount = leftValues.Length;
                        int rightCount = myValues.Length;
                        int totalCount = leftCount + rightCount;

                        // Merge the right half into the shared merged space.  This is a normal merge sort with
                        // the caveat that we stop merging once we reach the half-way point (since our partner
                        // is doing the same for the left half).  Note that during the last phase we only
                        // copy the values and not the indices or keys.
                        int m = (totalCount + 1)/2;
                        int i = totalCount - 1, j0 = leftCount - 1, j1 = rightCount - 1;
                        while (i >= m)
                        {
                            if ((i & CancellationState.POLL_INTERVAL) == 0)
                                CancellationState.ThrowIfCanceled(cancelToken);

                            if (j0 >= 0 && (j1 < 0 ||
                                            m_keyComparer.Compare(leftKeys[leftIndices[j0]],
                                                                  myKeysArr[myIndices[j1]]) > 0))
                            {
                                if (isLastPhase)
                                {
                                    mergedValues[i] = leftValues[leftIndices[j0]];
                                }
                                else
                                {
                                    mergedIndices[i] = leftIndices[j0];
                                }
                                j0--;
                            }
                            else
                            {
                                if (isLastPhase)
                                {
                                    mergedValues[i] = myValues[myIndices[j1]];
                                }
                                else
                                {
                                    mergedIndices[i] = leftCount + myIndices[j1];
                                }
                                j1--;
                            }
                            i--;
                        }

                        // If it's not the last phase, we just bulk propagate the keys and values.
                        if (!isLastPhase && myValues.Length > 0)
                        {
                            mergedKeys.CopyFrom(myKeysArr, myValues.Length);
                            Array.Copy(myValues, 0, mergedValues, leftCount, myValues.Length);
                        }

                        // Wait for our partner to finish copying too.
                        m_sharedBarriers[phase, partnerIndex].SignalAndWait(cancelToken);

                        // Now the greater of the two partners can leave, it's done.
                        break;
                    }
                }
            }
        }

        //---------------------------------------------------------------------------------------
        // Computes our partner index given the logarithmic reduction algorithm specified above.
        //

        private int ComputePartnerIndex(int phase)
        {
            int offset = 1 << phase;
            return m_partitionIndex + ((m_partitionIndex % (offset * 2)) == 0 ? offset : -offset);
        }

        //---------------------------------------------------------------------------------------
        // Sort algorithm used to sort key/value lists. After this has been called, the indices
        // will have been placed in sorted order based on the keys provided.
        //

        private void QuickSort(int left, int right, TKey[] keys, int[] indices, CancellationToken cancelToken)
        {
            Contract.Assert(keys != null, "need a non-null keyset");
            Contract.Assert(keys.Length >= indices.Length);
            Contract.Assert(left <= right);
            Contract.Assert(0 <= left && left < keys.Length);
            Contract.Assert(0 <= right && right < keys.Length);

            // cancellation check.
            // only test for intervals that are wider than so many items, else this test is 
            // relatively expensive compared to the work being performend.
            if (right - left > CancellationState.POLL_INTERVAL)
                CancellationState.ThrowIfCanceled(cancelToken);

            do
            {
                int i = left;
                int j = right;
                int pivot = indices[i + ((j - i) >> 1)];
                TKey pivotKey = keys[pivot];

                do
                {
                    while (m_keyComparer.Compare(keys[indices[i]], pivotKey) < 0) i++;
                    while (m_keyComparer.Compare(keys[indices[j]], pivotKey) > 0) j--;

                    Contract.Assert(i >= left && j <= right, "(i>=left && j<=right) sort failed - bogus IComparer?");

                    if (i > j)
                    {
                        break;
                    }

                    if (i < j)
                    {
                        // Swap the indices.
                        int tmp = indices[i];
                        indices[i] = indices[j];
                        indices[j] = tmp;
                    }

                    i++;
                    j--;
                }
                while (i <= j);

                if (j - left <= right - i)
                {
                    if (left < j)
                    {
                        QuickSort(left, j, keys, indices, cancelToken);
                    }
                    left = i;
                }
                else
                {
                    if (i < right)
                    {
                        QuickSort(i, right, keys, indices, cancelToken);
                    }
                    right = j;
                }
            }
            while (left < right);
        }
    }
}