DefaultMergeHelper.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / DefaultMergeHelper.cs / 1305376 / DefaultMergeHelper.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// DefaultMergeHelper.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Threading.Tasks; 
using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{
    /// 
    /// The default merge helper uses a set of straightforward algorithms for output
    /// merging. Namely, for synchronous merges, the input data is yielded from the 
    /// input data streams in "depth first" left-to-right order. For asynchronous merges,
    /// on the other hand, we use a biased choice algorithm to favor input channels in 
    /// a "fair" way. No order preservation is carried out by this helper. 
    /// 
    ///  
    /// 
    internal class DefaultMergeHelper : IMergeHelper
    {
        private QueryTaskGroupState m_taskGroupState; // State shared among tasks. 
        private PartitionedStream m_partitions; // Source partitions.
        private AsynchronousChannel[] m_asyncChannels; // Destination channels (async). 
        private SynchronousChannel[] m_syncChannels; // Destination channels ([....]). 
        private IEnumerator m_channelEnumerator; // Output enumerator.
        private TaskScheduler m_taskScheduler; // The task manager to execute the query. 
        private bool m_ignoreOutput; // Whether we're enumerating "for effect".

        //------------------------------------------------------------------------------------
        // Instantiates a new merge helper. 
        //
        // Arguments: 
        //     partitions   - the source partitions from which to consume data. 
        //     ignoreOutput - whether we're enumerating "for effect" or for output.
        //     pipeline     - whether to use a pipelined merge. 
        //

        internal DefaultMergeHelper(PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options,
            TaskScheduler taskScheduler, CancellationState cancellationState, int queryId) 
        {
            Contract.Assert(partitions != null); 
 
            m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
            m_partitions = partitions; 
            m_taskScheduler = taskScheduler;
            m_ignoreOutput = ignoreOutput;

            TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper"); 

            // If output won't be ignored, we need to manufacture a set of channels for the consumer. 
            // Otherwise, when the merge is executed, we'll just invoke the activities themselves. 
            if (!ignoreOutput)
            { 
                // Create the asynchronous or synchronous channels, based on whether we're pipelining.
                if (options != ParallelMergeOptions.FullyBuffered)
                {
                    if (partitions.PartitionCount > 1) 
                    {
                        m_asyncChannels = 
                            MergeExecutor.MakeAsynchronousChannels(partitions.PartitionCount, options, cancellationState.MergedCancellationToken); 
                        m_channelEnumerator = new AsynchronousChannelMergeEnumerator(m_taskGroupState, m_asyncChannels);
                    } 
                    else
                    {
                        // If there is only one partition, we don't need to create channels. The only producer enumerator
                        // will be used as the result enumerator. 
                        m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator();
                    } 
                } 
                else
                { 
                    m_syncChannels =
                        MergeExecutor.MakeSynchronousChannels(partitions.PartitionCount);
                    m_channelEnumerator = new SynchronousChannelMergeEnumerator(m_taskGroupState, m_syncChannels);
                } 

                Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount); 
                Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount); 
                Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output");
            } 
        }

        //-----------------------------------------------------------------------------------
        // Schedules execution of the merge itself. 
        //
        // Arguments: 
        //    ordinalIndexState - the state of the ordinal index of the merged partitions 
        //
 
        void IMergeHelper.Execute()
        {
            if (m_asyncChannels != null)
            { 
                SpoolingTask.SpoolPipeline(m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler);
            } 
            else if (m_syncChannels != null) 
            {
                SpoolingTask.SpoolStopAndGo(m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler); 
            }
            else if (m_ignoreOutput)
            {
                SpoolingTask.SpoolForAll(m_taskGroupState, m_partitions, m_taskScheduler); 
            }
            else 
            { 
                // The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results,
                // so we don't need any tasks to compute the results asynchronously. 
                Contract.Assert(m_partitions.PartitionCount == 1);
            }
        }
 
        //-----------------------------------------------------------------------------------
        // Gets the enumerator from which to enumerate output results. 
        // 

        IEnumerator IMergeHelper.GetEnumerator() 
        {
            Contract.Assert(m_ignoreOutput || m_channelEnumerator != null);
            return m_channelEnumerator;
        } 

        //----------------------------------------------------------------------------------- 
        // Returns the results as an array. 
        //
        // @ 



 

        public TInputOutput[] GetResultsAsArray() 
        { 
            if (m_syncChannels != null)
            { 
                // Right size an array.
                int totalSize = 0;
                for (int i = 0; i < m_syncChannels.Length; i++)
                { 
                    totalSize += m_syncChannels[i].Count;
                } 
                TInputOutput[] array = new TInputOutput[totalSize]; 

                // And then blit the elements in. 
                int current = 0;
                for (int i = 0; i < m_syncChannels.Length; i++)
                {
                    m_syncChannels[i].CopyTo(array, current); 
                    current += m_syncChannels[i].Count;
                } 
                return array; 
            }
            else 
            {
                List output = new List();
                using (IEnumerator enumerator = ((IMergeHelper)this).GetEnumerator())
                { 
                    while (enumerator.MoveNext())
                    { 
                        output.Add(enumerator.Current); 
                    }
                } 

                return output.ToArray();
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK