QueryOpeningEnumerator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / QueryOpeningEnumerator.cs / 1305376 / QueryOpeningEnumerator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// QueryOpeningEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections; 
using System.Collections.Generic;
using System.Threading; 
using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{
    /// 
    /// A wrapper enumerator that just opens the query operator when MoveNext() is called for the
    /// first time. We use QueryOpeningEnumerator to call QueryOperator.GetOpenedEnumerator() 
    /// lazily because once GetOpenedEnumerator() is called, PLINQ starts precomputing the
    /// results of the query. 
    ///  
    internal class QueryOpeningEnumerator : IEnumerator
    { 
        private readonly QueryOperator m_queryOperator;
        private IEnumerator m_openedQueryEnumerator;
        private QuerySettings m_querySettings;
        private readonly ParallelMergeOptions? m_mergeOptions; 
        private readonly bool m_suppressOrderPreservation;
        private int m_moveNextIteration = 0; 
        private bool m_hasQueryOpeningFailed; 

        // -- Cancellation and Dispose fields-- 
        // Disposal of the queryOpeningEnumerator can trigger internal cancellation and so it is important
        // that the internal cancellation signal is available both at this level, and deep in query execution
        // Also, it is useful to track the cause of cancellation so that appropriate exceptions etc can be
        // throw from the execution managers. 
        // => Both the topLevelDisposeFlag and the topLevelCancellationSignal are defined here, and will be shared
        //    down to QuerySettings and to the QueryTaskGroupStates that are associated with actual task-execution. 
        // => whilst these are the definitions, it is best to consider QuerySettings as the true owner of these. 
        private readonly Shared m_topLevelDisposedFlag = new Shared(false);  //a shared so that it can be referenced by others.
 
        // a top-level cancellation signal is required so that QueryOpeningEnumerator.Dispose() can tear things down.
        // This cancellationSignal will be used as the actual internal signal in QueryTaskGroupState.
        private readonly CancellationTokenSource m_topLevelCancellationTokenSource = new CancellationTokenSource();
 

        internal QueryOpeningEnumerator(QueryOperator queryOperator, ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation) 
        { 
            Contract.Assert(queryOperator != null);
 
            m_queryOperator = queryOperator;
            m_mergeOptions = mergeOptions;
            m_suppressOrderPreservation = suppressOrderPreservation;
        } 

        public TOutput Current 
        { 
            get
            { 
                if (m_openedQueryEnumerator == null)
                {
                    throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted));
                } 

                return m_openedQueryEnumerator.Current; 
            } 
        }
 
        public void Dispose()
        {
            m_topLevelDisposedFlag.Value = true;
            m_topLevelCancellationTokenSource.Cancel(); // initiate internal cancellation. 
            if (m_openedQueryEnumerator != null)
            { 
                m_openedQueryEnumerator.Dispose(); 
                m_querySettings.CleanStateAtQueryEnd();
            } 

            QueryLifecycle.LogicalQueryExecutionEnd(m_querySettings.QueryId);
        }
 
        object IEnumerator.Current
        { 
            get { return ((IEnumerator)this).Current; } 
        }
 
        public bool MoveNext()
        {
            if (m_topLevelDisposedFlag.Value)
            { 
                throw new ObjectDisposedException("enumerator", SR.GetString(SR.PLINQ_DisposeRequested));
            } 
 

            //Note: if Dispose has been called on a different thread to the thread that is enumerating, 
            //then there is a ---- where m_openedQueryEnumerator is instantiated but not disposed.
            //Best practice is that Dispose() should only be called by the owning thread, hence this cannot occur in correct usage scenarios.

            // Open the query operator if called for the first time. 

            if (m_openedQueryEnumerator == null) 
            { 
                // To keep the MoveNext method body small, the code that executes first time only is in a separate method.
                // It appears that if the method becomes too large, we observe a performance regression. This may have 
                // to do with method inlining. See bug 706485.
                OpenQuery();
            }
 
            bool innerMoveNextResult = m_openedQueryEnumerator.MoveNext();
 
            // This provides cancellation-testing for the consumer-side of the buffers that appears in each scenario: 
            //   Non-order-preserving (defaultMergeHelper)
            //       - asynchronous channel (pipelining) 
            //       - synchronous channel  (stop-and-go)
            //   Order-preserving (orderPreservingMergeHelper)
            //       - internal results buffer.
            // This moveNext is consuming data out of buffers, hence the inner moveNext is expected to be very fast. 
            // => thus we only test for cancellation per-N-iterations.
            // NOTE: the cancellation check occurs after performing moveNext in case the cancellation caused no data 
            //       to be produced.. We need to ensure that users sees an OCE rather than simply getting no data. (see Bug702254) 
            if ((m_moveNextIteration & CancellationState.POLL_INTERVAL) == 0)
            { 
                CancellationState.ThrowWithStandardMessageIfCanceled(
                    m_querySettings.CancellationState.ExternalCancellationToken);
            }
 
            m_moveNextIteration++;
            return innerMoveNextResult; 
        } 

        ///  
        /// Opens the query and initializes m_openedQueryEnumerator and m_querySettings.
        /// Called from the first MoveNext call.
        /// 
        private void OpenQuery() 
        {
            // Avoid opening (and failing) twice.. not only would it be bad to re-enumerate some elements, but 
            // the cancellation/disposed flags are most likely stale. 
            if (m_hasQueryOpeningFailed)
                throw new InvalidOperationException(SR.GetString(SR.PLINQ_EnumerationPreviouslyFailed)); 

            try
            {
                // stuff in appropriate defaults for unspecified options. 
                m_querySettings = m_queryOperator.SpecifiedQuerySettings
                    .WithPerExecutionSettings(m_topLevelCancellationTokenSource, m_topLevelDisposedFlag) 
                    .WithDefaults(); 

                QueryLifecycle.LogicalQueryExecutionBegin(m_querySettings.QueryId); 

                m_openedQueryEnumerator = m_queryOperator.GetOpenedEnumerator(
                    m_mergeOptions, m_suppressOrderPreservation, false, m_querySettings);
 

                // Now that we have opened the query, and got our hands on a supplied cancellation token 
                // we can perform an early cancellation check so that we will not do any major work if the token is already canceled. 
                CancellationState.ThrowWithStandardMessageIfCanceled(m_querySettings.CancellationState.ExternalCancellationToken);
            } 
            catch
            {
                m_hasQueryOpeningFailed = true;
                throw; 
            }
        } 
 
        public void Reset()
        { 
            throw new NotSupportedException();
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// QueryOpeningEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections; 
using System.Collections.Generic;
using System.Threading; 
using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{
    /// 
    /// A wrapper enumerator that just opens the query operator when MoveNext() is called for the
    /// first time. We use QueryOpeningEnumerator to call QueryOperator.GetOpenedEnumerator() 
    /// lazily because once GetOpenedEnumerator() is called, PLINQ starts precomputing the
    /// results of the query. 
    ///  
    internal class QueryOpeningEnumerator : IEnumerator
    { 
        private readonly QueryOperator m_queryOperator;
        private IEnumerator m_openedQueryEnumerator;
        private QuerySettings m_querySettings;
        private readonly ParallelMergeOptions? m_mergeOptions; 
        private readonly bool m_suppressOrderPreservation;
        private int m_moveNextIteration = 0; 
        private bool m_hasQueryOpeningFailed; 

        // -- Cancellation and Dispose fields-- 
        // Disposal of the queryOpeningEnumerator can trigger internal cancellation and so it is important
        // that the internal cancellation signal is available both at this level, and deep in query execution
        // Also, it is useful to track the cause of cancellation so that appropriate exceptions etc can be
        // throw from the execution managers. 
        // => Both the topLevelDisposeFlag and the topLevelCancellationSignal are defined here, and will be shared
        //    down to QuerySettings and to the QueryTaskGroupStates that are associated with actual task-execution. 
        // => whilst these are the definitions, it is best to consider QuerySettings as the true owner of these. 
        private readonly Shared m_topLevelDisposedFlag = new Shared(false);  //a shared so that it can be referenced by others.
 
        // a top-level cancellation signal is required so that QueryOpeningEnumerator.Dispose() can tear things down.
        // This cancellationSignal will be used as the actual internal signal in QueryTaskGroupState.
        private readonly CancellationTokenSource m_topLevelCancellationTokenSource = new CancellationTokenSource();
 

        internal QueryOpeningEnumerator(QueryOperator queryOperator, ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation) 
        { 
            Contract.Assert(queryOperator != null);
 
            m_queryOperator = queryOperator;
            m_mergeOptions = mergeOptions;
            m_suppressOrderPreservation = suppressOrderPreservation;
        } 

        public TOutput Current 
        { 
            get
            { 
                if (m_openedQueryEnumerator == null)
                {
                    throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted));
                } 

                return m_openedQueryEnumerator.Current; 
            } 
        }
 
        public void Dispose()
        {
            m_topLevelDisposedFlag.Value = true;
            m_topLevelCancellationTokenSource.Cancel(); // initiate internal cancellation. 
            if (m_openedQueryEnumerator != null)
            { 
                m_openedQueryEnumerator.Dispose(); 
                m_querySettings.CleanStateAtQueryEnd();
            } 

            QueryLifecycle.LogicalQueryExecutionEnd(m_querySettings.QueryId);
        }
 
        object IEnumerator.Current
        { 
            get { return ((IEnumerator)this).Current; } 
        }
 
        public bool MoveNext()
        {
            if (m_topLevelDisposedFlag.Value)
            { 
                throw new ObjectDisposedException("enumerator", SR.GetString(SR.PLINQ_DisposeRequested));
            } 
 

            //Note: if Dispose has been called on a different thread to the thread that is enumerating, 
            //then there is a ---- where m_openedQueryEnumerator is instantiated but not disposed.
            //Best practice is that Dispose() should only be called by the owning thread, hence this cannot occur in correct usage scenarios.

            // Open the query operator if called for the first time. 

            if (m_openedQueryEnumerator == null) 
            { 
                // To keep the MoveNext method body small, the code that executes first time only is in a separate method.
                // It appears that if the method becomes too large, we observe a performance regression. This may have 
                // to do with method inlining. See bug 706485.
                OpenQuery();
            }
 
            bool innerMoveNextResult = m_openedQueryEnumerator.MoveNext();
 
            // This provides cancellation-testing for the consumer-side of the buffers that appears in each scenario: 
            //   Non-order-preserving (defaultMergeHelper)
            //       - asynchronous channel (pipelining) 
            //       - synchronous channel  (stop-and-go)
            //   Order-preserving (orderPreservingMergeHelper)
            //       - internal results buffer.
            // This moveNext is consuming data out of buffers, hence the inner moveNext is expected to be very fast. 
            // => thus we only test for cancellation per-N-iterations.
            // NOTE: the cancellation check occurs after performing moveNext in case the cancellation caused no data 
            //       to be produced.. We need to ensure that users sees an OCE rather than simply getting no data. (see Bug702254) 
            if ((m_moveNextIteration & CancellationState.POLL_INTERVAL) == 0)
            { 
                CancellationState.ThrowWithStandardMessageIfCanceled(
                    m_querySettings.CancellationState.ExternalCancellationToken);
            }
 
            m_moveNextIteration++;
            return innerMoveNextResult; 
        } 

        ///  
        /// Opens the query and initializes m_openedQueryEnumerator and m_querySettings.
        /// Called from the first MoveNext call.
        /// 
        private void OpenQuery() 
        {
            // Avoid opening (and failing) twice.. not only would it be bad to re-enumerate some elements, but 
            // the cancellation/disposed flags are most likely stale. 
            if (m_hasQueryOpeningFailed)
                throw new InvalidOperationException(SR.GetString(SR.PLINQ_EnumerationPreviouslyFailed)); 

            try
            {
                // stuff in appropriate defaults for unspecified options. 
                m_querySettings = m_queryOperator.SpecifiedQuerySettings
                    .WithPerExecutionSettings(m_topLevelCancellationTokenSource, m_topLevelDisposedFlag) 
                    .WithDefaults(); 

                QueryLifecycle.LogicalQueryExecutionBegin(m_querySettings.QueryId); 

                m_openedQueryEnumerator = m_queryOperator.GetOpenedEnumerator(
                    m_mergeOptions, m_suppressOrderPreservation, false, m_querySettings);
 

                // Now that we have opened the query, and got our hands on a supplied cancellation token 
                // we can perform an early cancellation check so that we will not do any major work if the token is already canceled. 
                CancellationState.ThrowWithStandardMessageIfCanceled(m_querySettings.CancellationState.ExternalCancellationToken);
            } 
            catch
            {
                m_hasQueryOpeningFailed = true;
                throw; 
            }
        } 
 
        public void Reset()
        { 
            throw new NotSupportedException();
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK