OrderPreservingSpoolingTask.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / OrderPreservingSpoolingTask.cs / 1305376 / OrderPreservingSpoolingTask.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// OrderPreservingSpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers. This
    /// particular task variant preserves sort order in the final data. 
    ///  
    /// 
    ///  
    internal class OrderPreservingSpoolingTask : SpoolingTaskBase
    {
        private Shared m_results; // The destination array cell into which data is placed.
        private SortHelper m_sortHelper; // A helper that performs the sorting. 

        //------------------------------------------------------------------------------------ 
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments: 
        //     taskIndex   - the unique index of this task
        //     ordinalIndexState - the state of ordinal indices
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements 
        //
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be. 
        //
 
        private OrderPreservingSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState,
            Shared results, SortHelper sortHelper) :
            base(taskIndex, groupState) 
        {
            Contract.Assert(groupState != null); 
            Contract.Assert(results != null); 
            Contract.Assert(sortHelper != null);
 
            m_results = results;
            m_sortHelper = sortHelper;
        }
 
        //-----------------------------------------------------------------------------------
        // Creates and begins execution of a new spooling task. If pipelineMerges is specified, 
        // we will execute the task asynchronously; otherwise, this is done synchronously, 
        // and by the time this API has returned all of the results have been produced.
        // 
        // Arguments:
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements
        //     ordinalIndexState - state of the index of the input to the merge 
        //
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be. 
        //
 
        internal static void Spool(
            QueryTaskGroupState groupState, PartitionedStream partitions,
            Shared results, TaskScheduler taskScheduler)
        { 
            Contract.Assert(groupState != null);
            Contract.Assert(partitions != null); 
            Contract.Assert(results != null); 
            Contract.Assert(results.Value == null);
 
            // Determine how many async tasks to create.
            int maxToRunInParallel = partitions.PartitionCount - 1;

            // Generate a set of sort helpers. 
            SortHelper[] sortHelpers =
                SortHelper.GenerateSortHelpers(partitions, groupState); 
 
            // Ensure all tasks in this query are parented under a common root.
            Task rootTask = new Task( 
                () =>
                {
                    // Create tasks that will enumerate the partitions in parallel.  We'll use the current
                    // thread for one task and then block before returning to the caller, until all results 
                    // have been accumulated. Pipelining is not supported by sort merges.
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    { 
                        TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i);
                        QueryTask asyncTask = new OrderPreservingSpoolingTask( 
                            i, groupState, results, sortHelpers[i]);
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
 
                    // Run one task synchronously on the current thread.
                    TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 
                    QueryTask syncTask = new OrderPreservingSpoolingTask( 
                        maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]);
                    syncTask.RunSynchronously(taskScheduler); 
                });

            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 

            // We don't want to return until the task is finished.  Run it on the calling thread. 
            rootTask.RunSynchronously(taskScheduler); 

            // Destroy the state associated with our sort helpers. 
            for (int i = 0; i < sortHelpers.Length; i++)
            {
                sortHelpers[i].Dispose();
            } 

            // End the query, which has the effect of propagating any unhandled exceptions. 
            groupState.QueryEnd(false); 
        }
 
        //-----------------------------------------------------------------------------------
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own.
        // 

        protected override void SpoolingWork() 
        { 
            Contract.Assert(m_sortHelper != null);
 
            // This task must perform a sort just prior to handing data to the merge.
            // We just defer to a sort helper object for this task.
            TInputOutput[] sortedOutput = m_sortHelper.Sort();
 
            if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
            { 
                // The 0th task is responsible for communicating the results to the merging infrastructure. 
                // By this point, the results have been sorted, so we just publish a reference to the array.
                if (m_taskIndex == 0) 
                {
                    Contract.Assert(sortedOutput != null);
                    m_results.Value = sortedOutput;
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// OrderPreservingSpoolingTask.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Threading; 
using System.Threading.Tasks;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// A spooling task handles marshaling data from a producer to a consumer. It's given
    /// a single enumerator object that contains all of the production algorithms, a single
    /// destination channel from which consumers draw results, and (optionally) a 
    /// synchronization primitive using which to notify asynchronous consumers. This
    /// particular task variant preserves sort order in the final data. 
    ///  
    /// 
    ///  
    internal class OrderPreservingSpoolingTask : SpoolingTaskBase
    {
        private Shared m_results; // The destination array cell into which data is placed.
        private SortHelper m_sortHelper; // A helper that performs the sorting. 

        //------------------------------------------------------------------------------------ 
        // Creates, but does not execute, a new spooling task. 
        //
        // Arguments: 
        //     taskIndex   - the unique index of this task
        //     ordinalIndexState - the state of ordinal indices
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements 
        //
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be. 
        //
 
        private OrderPreservingSpoolingTask(
            int taskIndex, QueryTaskGroupState groupState,
            Shared results, SortHelper sortHelper) :
            base(taskIndex, groupState) 
        {
            Contract.Assert(groupState != null); 
            Contract.Assert(results != null); 
            Contract.Assert(sortHelper != null);
 
            m_results = results;
            m_sortHelper = sortHelper;
        }
 
        //-----------------------------------------------------------------------------------
        // Creates and begins execution of a new spooling task. If pipelineMerges is specified, 
        // we will execute the task asynchronously; otherwise, this is done synchronously, 
        // and by the time this API has returned all of the results have been produced.
        // 
        // Arguments:
        //     source      - the producer enumerator
        //     destination - the destination channel into which to spool elements
        //     ordinalIndexState - state of the index of the input to the merge 
        //
        // Assumptions: 
        //     Source cannot be null, although the other arguments may be. 
        //
 
        internal static void Spool(
            QueryTaskGroupState groupState, PartitionedStream partitions,
            Shared results, TaskScheduler taskScheduler)
        { 
            Contract.Assert(groupState != null);
            Contract.Assert(partitions != null); 
            Contract.Assert(results != null); 
            Contract.Assert(results.Value == null);
 
            // Determine how many async tasks to create.
            int maxToRunInParallel = partitions.PartitionCount - 1;

            // Generate a set of sort helpers. 
            SortHelper[] sortHelpers =
                SortHelper.GenerateSortHelpers(partitions, groupState); 
 
            // Ensure all tasks in this query are parented under a common root.
            Task rootTask = new Task( 
                () =>
                {
                    // Create tasks that will enumerate the partitions in parallel.  We'll use the current
                    // thread for one task and then block before returning to the caller, until all results 
                    // have been accumulated. Pipelining is not supported by sort merges.
                    for (int i = 0; i < maxToRunInParallel; i++) 
                    { 
                        TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i);
                        QueryTask asyncTask = new OrderPreservingSpoolingTask( 
                            i, groupState, results, sortHelpers[i]);
                        asyncTask.RunAsynchronously(taskScheduler);
                    }
 
                    // Run one task synchronously on the current thread.
                    TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); 
                    QueryTask syncTask = new OrderPreservingSpoolingTask( 
                        maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]);
                    syncTask.RunSynchronously(taskScheduler); 
                });

            // Begin the query on the calling thread.
            groupState.QueryBegin(rootTask); 

            // We don't want to return until the task is finished.  Run it on the calling thread. 
            rootTask.RunSynchronously(taskScheduler); 

            // Destroy the state associated with our sort helpers. 
            for (int i = 0; i < sortHelpers.Length; i++)
            {
                sortHelpers[i].Dispose();
            } 

            // End the query, which has the effect of propagating any unhandled exceptions. 
            groupState.QueryEnd(false); 
        }
 
        //-----------------------------------------------------------------------------------
        // This method is responsible for enumerating results and enqueueing them to
        // the output channel(s) as appropriate.  Each base class implements its own.
        // 

        protected override void SpoolingWork() 
        { 
            Contract.Assert(m_sortHelper != null);
 
            // This task must perform a sort just prior to handing data to the merge.
            // We just defer to a sort helper object for this task.
            TInputOutput[] sortedOutput = m_sortHelper.Sort();
 
            if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
            { 
                // The 0th task is responsible for communicating the results to the merging infrastructure. 
                // By this point, the results have been sorted, so we just publish a reference to the array.
                if (m_taskIndex == 0) 
                {
                    Contract.Assert(sortedOutput != null);
                    m_results.Value = sortedOutput;
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK