QueryTaskGroupState.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / QueryTaskGroupState.cs / 1305376 / QueryTaskGroupState.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// QueryTaskGroupState.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Threading.Tasks; 
using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{
    /// 
    /// A collection of tasks used by a single query instance. This type also offers some
    /// convenient methods for tracing significant ETW events, waiting on tasks, propagating 
    /// exceptions, and performing cancellation activities.
    ///  
    internal class QueryTaskGroupState 
    {
        private Task m_rootTask; // The task under which all query tasks root. 
        private int m_alreadyEnded; // Whether the tasks have been waited on already.
        private CancellationState m_cancellationState; // The cancellation state.
        private int m_queryId; // Id of this query execution.
 

        //------------------------------------------------------------------------------------ 
        // Creates a new shared bit of state among tasks. 
        //
 
        internal QueryTaskGroupState(CancellationState cancellationState, int queryId)
        {
            m_cancellationState = cancellationState;
            m_queryId = queryId; 
        }
 
        //----------------------------------------------------------------------------------- 
        // Whether this query has ended or not.
        // 

        internal bool IsAlreadyEnded
        {
            get { return m_alreadyEnded == 1; } 
        }
 
        //----------------------------------------------------------------------------------- 
        // Cancellation state, used to tear down tasks cooperatively when necessary.
        // 

        internal CancellationState CancellationState
        {
            get { return m_cancellationState; } 
        }
 
        //----------------------------------------------------------------------------------- 
        // Id of this query execution.
        // 

        internal int QueryId
        {
            get { return m_queryId; } 
        }
 
        //------------------------------------------------------------------------------------ 
        // Marks the beginning of a query's execution.
        // 

        internal void QueryBegin(Task rootTask)
        {
            Contract.Assert(rootTask != null, "Expected a non-null task"); 
            Contract.Assert(m_rootTask == null, "Cannot begin a query more than once");
            m_rootTask = rootTask; 
        } 

        //----------------------------------------------------------------------------------- 
        // Marks the end of a query's execution, waiting for all tasks to finish and
        // propagating any relevant exceptions.  Note that the full set of tasks must have
        // been initialized (with SetTask) before calling this.
        // 

        internal void QueryEnd(bool userInitiatedDispose) 
        { 
            Contract.Assert(m_rootTask != null);
            //Contract.Assert(Task.Current == null || (Task.Current != m_rootTask && Task.Current.Parent != m_rootTask)); 

            if (Interlocked.Exchange(ref m_alreadyEnded, 1) == 0)
            {
                // There are four cases: 
                // Case #1: Wait produced an exception that is not OCE(ct), or an AggregateException which is not full of OCE(ct) ==>  We rethrow.
                // Case #2: External cancellation has been requested ==> we'll manually throw OCE(externalToken). 
                // Case #3a: We are servicing a call to Dispose() (and possibly also external cancellation has been requested).. simply return. See bug 695173 
                // Case #3b: The enumerator has already been disposed (and possibly also external cancellation was requested).  Throw an ODE.
                // Case #4: No exceptions or explicit call to Dispose() by this caller ==> we just return. 

                // See also "InlinedAggregationOperator" which duplicates some of this logic for the aggregators.
                // See also "QueryOpeningEnumerator" which duplicates some of this logic.
                // See also "ExceptionAggregator" which duplicates some of this logic. 

                try 
                { 
                    // Wait for all the tasks to complete
                    // If any of the tasks ended in the Faulted stated, an AggregateException will be thrown. 
                    m_rootTask.Wait();
                }
                catch (AggregateException ae)
                { 
                    AggregateException flattenedAE = ae.Flatten();
                    bool allOCEsOnTrackedExternalCancellationToken = true; 
                    for (int i = 0; i < flattenedAE.InnerExceptions.Count; i++) 
                    {
                        OperationCanceledException oce = flattenedAE.InnerExceptions[i] as OperationCanceledException; 

                        // we only let it pass through iff:
                        // it is not null, not default, and matches the exact token we were given as being the external token
                        // and the external Token is actually canceled (ie not a spoof OCE(extCT) for a non-canceled extCT) 
                        if (oce == null ||
                            !oce.CancellationToken.IsCancellationRequested || 
                            oce.CancellationToken != m_cancellationState.ExternalCancellationToken) 
                        {
                            allOCEsOnTrackedExternalCancellationToken = false; 
                            break;
                        }
                    }
 
                    // if all the exceptions were OCE(externalToken), then we will propogate only a single OCE(externalToken) below
                    // otherwise, we flatten the aggregate (because the WaitAll above already aggregated) and rethrow. 
                    if (!allOCEsOnTrackedExternalCancellationToken) 
                        throw flattenedAE;  // Case #1
                } 
                finally
                {
                    m_rootTask.Dispose();
                } 

                if (m_cancellationState.MergedCancellationToken.IsCancellationRequested) 
                { 
                    // cancellation has occured but no user-delegate exceptions were detected
 
                    // NOTE: it is important that we see other state variables correctly here, and that
                    // read-reordering hasn't played havoc.
                    // This is OK because
                    //   1. all the state writes (eg in the Initiate* methods) are volatile writes (standard .NET MM) 
                    //   2. tokenCancellationRequested is backed by a volatile field, hence the reads below
                    //   won't get reordered about the read of token.IsCancellationRequested. 
 
                    // If the query has already been disposed, we don't want to throw an OCE (this is a fix for bug 695173.)
                    if (!m_cancellationState.TopLevelDisposedFlag.Value) 
                    {
                        CancellationState.ThrowWithStandardMessageIfCanceled(m_cancellationState.ExternalCancellationToken); // Case #2
                    }
 
                    //otherwise, given that there were no user-delegate exceptions (they would have been rethrown above),
                    //the only remaining situation is user-initiated dispose. 
                    Contract.Assert(m_cancellationState.TopLevelDisposedFlag.Value); 

                    // If we aren't actively disposing, that means somebody else previously disposed 
                    // of the enumerator. We must throw an ObjectDisposedException.
                    if (!userInitiatedDispose)
                    {
                        throw new ObjectDisposedException("enumerator", SR.GetString(SR.PLINQ_DisposeRequested)); // Case #3 
                    }
                } 
 
                // Case #4. nothing to do.
            } 
        }
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK