IndexedWhereQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / IndexedWhereQueryOperator.cs / 1305376 / IndexedWhereQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// IndexedWhereQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// A variant of the Where operator that supplies element index while performing the
    /// filtering operation. This requires cooperation with partitioning and merging to
    /// guarantee ordering is preserved. 
    ///
    /// @ 
 

 

    internal sealed class IndexedWhereQueryOperator : UnaryQueryOperator
    {
 
        // Predicate function. Used to filter out non-matching elements during execution.
        private Func m_predicate; 
        private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. 

        //---------------------------------------------------------------------------------------- 
        // Initializes a new where operator.
        //
        // Arguments:
        //    child         - the child operator or data source from which to pull data 
        //    predicate     - a delegate representing the predicate function
        // 
        // Assumptions: 
        //    predicate must be non null.
        // 

        internal IndexedWhereQueryOperator(IEnumerable child,
                                           Func predicate)
            :base(child) 
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
            Contract.Assert(predicate != null, "need a filter function"); 

            m_predicate = predicate; 

            // In an indexed Select, elements must be returned in the order in which
            // indices were assigned.
            m_outputOrdered = true; 

            InitOrdinalIndexState(); 
        } 

        private void InitOrdinalIndexState() 
        {
            if (ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct))
            {
                m_prematureMerge = true; 
            }
 
            SetOrdinalIndexState(OrdinalIndexState.Increasing); 
        }
 

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed. 
        //
 
        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping)
        { 
            QueryResults childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) 
        { 
            int partitionCount = inputStream.PartitionCount;
 
            // If the index is not correct, we need to reindex.
            PartitionedStream inputStreamInt;
            if (m_prematureMerge)
            { 
                ListQueryResults listResults = ExecuteAndCollectResults(inputStream, partitionCount, Child.OutputOrdered, preferStriping, settings);
                inputStreamInt = listResults.GetPartitionedStream(); 
            } 
            else
            { 
                Contract.Assert(typeof(TKey) == typeof(int));
                inputStreamInt = (PartitionedStream)(object)inputStream;
            }
 
            // Since the index is correct, the type of the index must be int
            PartitionedStream outputStream = 
                new PartitionedStream(partitionCount, Util.GetDefaultComparer(), OrdinalIndexState); 

            for (int i = 0; i < partitionCount; i++) 
            {
                outputStream[i] = new IndexedWhereQueryOperatorEnumerator(inputStreamInt[i], m_predicate, settings.CancellationState.MergedCancellationToken);
            }
 
            recipient.Receive(outputStream);
        } 
 

        //--------------------------------------------------------------------------------------- 
        // Returns an enumerable that represents the query executing sequentially.
        //

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); 
            return wrappedChild.Where(m_predicate); 
        }
 

        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        // 

        internal override bool LimitsParallelism 
        { 
            get { return m_prematureMerge; }
        } 

        //------------------------------------------------------------------------------------
        // An enumerator that implements the filtering logic.
        // 

        private class IndexedWhereQueryOperatorEnumerator : QueryOperatorEnumerator 
        { 

            private readonly QueryOperatorEnumerator m_source; // The data source to enumerate. 
            private readonly Func m_predicate; // The predicate used for filtering.
            private CancellationToken m_cancellationToken;
            private Shared m_outputLoopCount;
            //----------------------------------------------------------------------------------- 
            // Instantiates a new enumerator.
            // 
 
            internal IndexedWhereQueryOperatorEnumerator(QueryOperatorEnumerator source, Func predicate,
                CancellationToken cancellationToken) 
            {
                Contract.Assert(source != null);
                Contract.Assert(predicate != null);
                m_source = source; 
                m_predicate = predicate;
                m_cancellationToken = cancellationToken; 
            } 

            //------------------------------------------------------------------------------------ 
            // Moves to the next matching element in the underlying data stream.
            //

            internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) 
            {
                Contract.Assert(m_predicate != null, "expected a compiled operator"); 
 
                // Iterate through the input until we reach the end of the sequence or find
                // an element matching the predicate. 

                if (m_outputLoopCount == null)
                    m_outputLoopCount = new Shared(0);
 
                while (m_source.MoveNext(ref currentElement, ref currentKey))
                { 
                    if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) 
                        CancellationState.ThrowIfCanceled(m_cancellationToken);
 
                    if (m_predicate(currentElement, currentKey))
                    {
                        return true;
                    } 
                }
 
                return false; 
            }
 
            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_source != null);
                m_source.Dispose(); 
            }
 
        } 

    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK