MergeExecutor.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / MergeExecutor.cs / 1305376 / MergeExecutor.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// MergeExecutor.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections; 
using System.Collections.Generic;
using System.Threading; 
using System.Threading.Tasks; 
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel
{
    /// 
    /// Drives execution of an actual merge operation, including creating channel data 
    /// structures and scheduling parallel work as appropriate. The algorithms used
    /// internally are parameterized based on the type of data in the partitions; e.g. 
    /// if an order preserved stream is found, the merge will automatically use an 
    /// order preserving merge, and so forth.
    ///  
    /// 
    internal class MergeExecutor : IEnumerable
    {
 
        // Many internal algorithms are parameterized based on the data. The IMergeHelper
        // is the pluggable interface whose implementations perform those algorithms. 
        private IMergeHelper m_mergeHelper; 

        // Private constructor. MergeExecutor should only be constructed via the 
        // MergeExecutor.Execute static method.
        private MergeExecutor()
        {
        } 

        //------------------------------------------------------------------------------------ 
        // Creates and executes a new merge executor object. 
        //
        // Arguments: 
        //     partitions   - the partitions whose data will be merged into one stream
        //     ignoreOutput - if true, we are enumerating "for effect", and we won't actually
        //                    generate data in the output stream
        //     pipeline     - whether to use a pipelined merge or not. 
        //     isOrdered    - whether to perform an ordering merge.
        // 
 
        internal static MergeExecutor Execute(
            PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered, 
            CancellationState cancellationState, int queryId)
        {
            Contract.Assert(partitions != null);
            Contract.Assert(partitions.PartitionCount > 0); 
            Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?");
 
            MergeExecutor mergeExecutor = new MergeExecutor(); 
            if (isOrdered && !ignoreOutput)
            { 
                if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
                {
                    Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
                    bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered); 

                    if (partitions.PartitionCount > 1) 
                    { 
                        // We use a pipelining ordered merge
                        mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper( 
                            (PartitionedStream)(object)partitions, taskScheduler, cancellationState, autoBuffered, queryId);
                    }
                    else
                    { 
                        // When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering
                        // does not add any extra overhead, and no producer task needs to be scheduled. 
                        mergeExecutor.m_mergeHelper = new DefaultMergeHelper( 
                            partitions, false, options, taskScheduler, cancellationState, queryId);
                    } 
                }
                else
                {
                    // We use a stop-and-go ordered merge helper 
                    mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper(partitions, taskScheduler, cancellationState, queryId);
                } 
            } 
            else
            { 
                // We use a default - unordered - merge helper.
                mergeExecutor.m_mergeHelper = new DefaultMergeHelper(partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId);
            }
 
            mergeExecutor.Execute();
            return mergeExecutor; 
        } 

        //----------------------------------------------------------------------------------- 
        // Initiates execution of the merge.
        //

        private void Execute() 
        {
            Contract.Assert(m_mergeHelper != null); 
            m_mergeHelper.Execute(); 
        }
 
        //-----------------------------------------------------------------------------------
        // Returns an enumerator that will yield elements from the resulting merged data
        // stream.
        // 

        IEnumerator IEnumerable.GetEnumerator() 
        { 
            return ((IEnumerable)this).GetEnumerator();
        } 

        public IEnumerator GetEnumerator()
        {
            Contract.Assert(m_mergeHelper != null); 
            return m_mergeHelper.GetEnumerator();
        } 
 
        //-----------------------------------------------------------------------------------
        // Returns the merged results as an array. 
        //

        internal TInputOutput[] GetResultsAsArray()
        { 
            return m_mergeHelper.GetResultsAsArray();
        } 
 
        //------------------------------------------------------------------------------------
        // This internal helper method is used to generate a set of asynchronous channels. 
        // The algorithm used by each channel contains the necessary synchronizationis to
        // ensure it is suitable for pipelined consumption.
        //
        // Arguments: 
        //     partitionsCount - the number of partitions for which to create new channels.
        // 
        // Return Value: 
        //     An array of asynchronous channels, one for each partition.
        // 

        internal static AsynchronousChannel[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, CancellationToken cancellationToken)
        {
            AsynchronousChannel[] channels = new AsynchronousChannel[partitionCount]; 

            Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered); 
            TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount); 

            // If we are pipelining, we need a channel that contains the necessary synchronization 
            // in it. We choose a bounded/blocking channel data structure: bounded so that we can
            // limit the amount of memory overhead used by the query by putting a cap on the
            // buffer size into which producers place data, and blocking so that the consumer can
            // wait for additional data to arrive in the case that it's found to be empty. 

            int chunkSize = 0; // 0 means automatic chunk size 
            if (options == ParallelMergeOptions.NotBuffered) 
            {
                chunkSize = 1; 
            }

            for (int i = 0; i < channels.Length; i++)
            { 
                channels[i] = new AsynchronousChannel(chunkSize, cancellationToken);
            } 
 
            return channels;
        } 

        //-----------------------------------------------------------------------------------
        // This internal helper method is used to generate a set of synchronous channels.
        // The channel data structure used has been optimized for sequential execution and 
        // does not support pipelining.
        // 
        // Arguments: 
        //     partitionsCount - the number of partitions for which to create new channels.
        // 
        // Return Value:
        //     An array of synchronous channels, one for each partition.
        //
 
        internal static SynchronousChannel[] MakeSynchronousChannels(int partitionCount)
        { 
            SynchronousChannel[] channels = new SynchronousChannel[partitionCount]; 

            TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount); 

            // We just build up the results in memory using simple, dynamically growable FIFO queues.
            for (int i = 0; i < channels.Length; i++)
            { 
                channels[i] = new SynchronousChannel();
            } 
 
            return channels;
        } 

    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK