DefaultIfEmptyQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DefaultIfEmptyQueryOperator.cs / 1305376 / DefaultIfEmptyQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// DefaultIfEmptyQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// This operator just exposes elements directly from the underlying data source, if
    /// it's not empty, or yields a single default element if the data source is empty.
    /// There is a minimal amount of synchronization at the beginning, until all partitions 
    /// have registered whether their stream is empty or not. Once the 0th partition knows
    /// that at least one other partition is non-empty, it may proceed. Otherwise, it is 
    /// the 0th partition which yields the default value. 
    /// 
    ///  
    internal sealed class DefaultIfEmptyQueryOperator : UnaryQueryOperator
    {

        private readonly TSource m_defaultValue; // The default value to use (if empty). 

        //---------------------------------------------------------------------------------------- 
        // Initializes a new reverse operator. 
        //
        // Arguments: 
        //     child                - the child whose data we will reverse
        //

        internal DefaultIfEmptyQueryOperator(IEnumerable child, TSource defaultValue) 
            :base(child)
        { 
            Contract.Assert(child != null, "child data source cannot be null"); 
            m_defaultValue = defaultValue;
            SetOrdinalIndexState(ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Correct)); 
        }

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with 
        // partitions as needed.
        // 
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping)
        { 
            // We just open the child operator.
            QueryResults childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        } 

        internal override void  WrapPartitionedStream( 
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) 
        {
            int partitionCount = inputStream.PartitionCount; 

            // Generate the shared data.
            Shared sharedEmptyCount = new Shared(0);
            CountdownEvent sharedLatch = new CountdownEvent(partitionCount - 1); 

            PartitionedStream outputStream = 
                new PartitionedStream(partitionCount, inputStream.KeyComparer, OrdinalIndexState); 

            for (int i = 0; i < partitionCount; i++) 
            {
                outputStream[i] = new DefaultIfEmptyQueryOperatorEnumerator(
                    inputStream[i], m_defaultValue, i, partitionCount, sharedEmptyCount, sharedLatch, settings.CancellationState.MergedCancellationToken);
            } 

            recipient.Receive(outputStream); 
        } 

        //--------------------------------------------------------------------------------------- 
        // Returns an enumerable that represents the query executing sequentially.
        //

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            return Child.AsSequentialQuery(token).DefaultIfEmpty(m_defaultValue); 
        } 

        //--------------------------------------------------------------------------------------- 
        // Whether this operator performs a premature merge.
        //

        internal override bool LimitsParallelism 
        {
            get { return false; } 
        } 

 
        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the default-if-empty operation.
        //
 
        class DefaultIfEmptyQueryOperatorEnumerator : QueryOperatorEnumerator
        { 
            private QueryOperatorEnumerator m_source; // The data source to enumerate. 
            private bool m_lookedForEmpty; // Whether this partition has looked for empty yet.
            private int m_partitionIndex; // This enumerator's partition index. 
            private int m_partitionCount; // The number of partitions.
            private TSource m_defaultValue; // The default value if the 0th partition is empty.

            // Data shared among partitions. 
            private Shared m_sharedEmptyCount; // The number of empty partitions.
            private CountdownEvent m_sharedLatch; // Shared latch, signaled when partitions process the 1st item. 
            private CancellationToken m_cancelToken; // Token used to cancel this operator. 

            //--------------------------------------------------------------------------------------- 
            // Instantiates a new select enumerator.
            //

            internal DefaultIfEmptyQueryOperatorEnumerator( 
                QueryOperatorEnumerator source, TSource defaultValue, int partitionIndex, int partitionCount,
                Shared sharedEmptyCount, CountdownEvent sharedLatch, CancellationToken cancelToken) 
            { 
                Contract.Assert(source != null);
                Contract.Assert(0 <= partitionIndex && partitionIndex < partitionCount); 
                Contract.Assert(partitionCount > 0);
                Contract.Assert(sharedEmptyCount != null);
                Contract.Assert(sharedLatch != null);
 
                m_source = source;
                m_defaultValue = defaultValue; 
                m_partitionIndex = partitionIndex; 
                m_partitionCount = partitionCount;
                m_sharedEmptyCount = sharedEmptyCount; 
                m_sharedLatch = sharedLatch;
                m_cancelToken = cancelToken;
            }
 
            //----------------------------------------------------------------------------------------
            // Straightforward IEnumerator methods. 
            // 

            internal override bool MoveNext(ref TSource currentElement, ref TKey currentKey) 
            {
                Contract.Assert(m_source != null);

                bool moveNextResult = m_source.MoveNext(ref currentElement, ref currentKey); 

                // There is special logic the first time this function is called. 
                if (!m_lookedForEmpty) 
                {
                    // Ensure we don't enter this loop again. 
                    m_lookedForEmpty = true;

                    if (!moveNextResult)
                    { 
                        if (m_partitionIndex == 0)
                        { 
                            // If this is the 0th partition, we must wait for all others.  Note: we could 
                            // actually do a wait-any here: if at least one other partition finds an element,
                            // there is strictly no need to wait.  But this would require extra coordination 
                            // which may or may not be worth the trouble.
                            m_sharedLatch.Wait(m_cancelToken);
                            m_sharedLatch.Dispose();
 
                            // Now see if there were any other partitions with data.
                            if (m_sharedEmptyCount.Value == m_partitionCount - 1) 
                            { 
                                // No data, we will yield the default value.
                                currentElement = m_defaultValue; 
                                currentKey = default(TKey);
                                return true;
                            }
                            else 
                            {
                                // Another partition has data, we are done. 
                                return false; 
                            }
                        } 
                        else
                        {
                            // Not the 0th partition, we will increment the shared empty counter.
                            Interlocked.Increment(ref m_sharedEmptyCount.Value); 
                        }
                    } 
 
                    // Every partition (but the 0th) will signal the latch the first time.
                    if (m_partitionIndex != 0) 
                    {
                        m_sharedLatch.Signal();
                    }
                } 

                return moveNextResult; 
            } 

            protected override void Dispose(bool disposing) 
            {
                m_source.Dispose();
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK