IntersectQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / IntersectQueryOperator.cs / 1305376 / IntersectQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// IntersectQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// Operator that yields the intersection of two data sources.
    /// 
    ///  
    internal sealed class IntersectQueryOperator :
        BinaryQueryOperator 
    { 

        private readonly IEqualityComparer m_comparer; // An equality comparer. 

        //----------------------------------------------------------------------------------------
        // Constructs a new intersection operator.
        // 

        internal IntersectQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer) 
            :base(left, right) 
        {
            Contract.Assert(left != null && right != null, "child data sources cannot be null"); 

            m_comparer = comparer;
            m_outputOrdered = LeftChild.OutputOrdered;
 
            SetOrdinalIndex(OrdinalIndexState.Shuffled);
        } 
 

        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping)
        {
            // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but
            // instead explicitly set it to false. Regardless of whether the parent prefers striping or range 
            // partitioning, the output will be hash-partititioned.
            QueryResults leftChildResults = LeftChild.Open(settings, false); 
            QueryResults rightChildResults = RightChild.Open(settings, false); 

            return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); 
        }

        public override void WrapPartitionedStream(
            PartitionedStream leftPartitionedStream, PartitionedStream rightPartitionedStream, 
            IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings)
        { 
            Contract.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount); 

            if (OutputOrdered) 
            {
                WrapPartitionedStreamHelper(
                    ExchangeUtilities.HashRepartitionOrdered(
                        leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), 
                    rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
            } 
            else 
            {
                WrapPartitionedStreamHelper( 
                    ExchangeUtilities.HashRepartition(
                        leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
                    rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
            } 
        }
 
        //--------------------------------------------------------------------------------------- 
        // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
        // to be, and then call this method with that key as a generic parameter. 
        //

        private void WrapPartitionedStreamHelper(
            PartitionedStream, TLeftKey> leftHashStream, PartitionedStream rightPartitionedStream, 
            IPartitionedStreamRecipient outputRecipient, CancellationToken cancellationToken)
        { 
            int partitionCount = leftHashStream.PartitionCount; 

            PartitionedStream, int> rightHashStream = 
                ExchangeUtilities.HashRepartition(
                    rightPartitionedStream, null, null, m_comparer, cancellationToken);

            PartitionedStream outputStream = 
                new PartitionedStream(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
            for (int i = 0; i < partitionCount; i++) 
            { 
                if (OutputOrdered)
                { 
                    outputStream[i] = new OrderedIntersectQueryOperatorEnumerator(
                        leftHashStream[i], rightHashStream[i], m_comparer, leftHashStream.KeyComparer, cancellationToken);
                }
                else 
                {
                    outputStream[i] = (QueryOperatorEnumerator)(object) 
                            new IntersectQueryOperatorEnumerator(leftHashStream[i], rightHashStream[i], m_comparer, cancellationToken); 
                }
            } 

            outputRecipient.Receive(outputStream);
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        // 

        internal override bool LimitsParallelism 
        {
            get { return false; }
        }
 
        //---------------------------------------------------------------------------------------
        // This enumerator performs the intersection operation incrementally. It does this by 
        // maintaining a history -- in the form of a set -- of all data already seen. It then 
        // only returns elements that are seen twice (returning each one only once).
        // 

        class IntersectQueryOperatorEnumerator : QueryOperatorEnumerator
        {
 
            private QueryOperatorEnumerator, TLeftKey> m_leftSource; // Left data source.
            private QueryOperatorEnumerator, int> m_rightSource; // Right data source. 
            private IEqualityComparer m_comparer; // Comparer to use for equality/hash-coding. 
            private Set m_hashLookup; // The hash lookup, used to produce the intersection.
            private CancellationToken m_cancellationToken; 
            private Shared m_outputLoopCount;

            //----------------------------------------------------------------------------------------
            // Instantiates a new intersection operator. 
            //
 
            internal IntersectQueryOperatorEnumerator( 
                QueryOperatorEnumerator, TLeftKey> leftSource,
                QueryOperatorEnumerator, int> rightSource, 
                IEqualityComparer comparer, CancellationToken cancellationToken)
            {
                Contract.Assert(leftSource != null);
                Contract.Assert(rightSource != null); 

                m_leftSource = leftSource; 
                m_rightSource = rightSource; 
                m_comparer = comparer;
                m_cancellationToken = cancellationToken; 
            }

            //---------------------------------------------------------------------------------------
            // Walks the two data sources, left and then right, to produce the intersection. 
            //
 
            internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) 
            {
                Contract.Assert(m_leftSource != null); 
                Contract.Assert(m_rightSource != null);

                // Build the set out of the right data source, if we haven't already.
 
                if (m_hashLookup == null)
                { 
                    m_outputLoopCount = new Shared(0); 
                    // @
                    m_hashLookup = new Set(m_comparer); 

                    Pair rightElement = default(Pair);
                    int rightKeyUnused = default(int);
 
                    int i = 0;
                    while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) 
                    { 
                        if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(m_cancellationToken); 

                        m_hashLookup.Add(rightElement.First);
                    }
                } 

                // Now iterate over the left data source, looking for matches. 
                Pair leftElement = default(Pair); 
                TLeftKey keyUnused = default(TLeftKey);
 
                while (m_leftSource.MoveNext(ref leftElement, ref keyUnused))
                {
                    if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(m_cancellationToken); 

                    // If we found the element in our set, and if we haven't returned it yet, 
                    // we can yield it to the caller. We also mark it so we know we've returned 
                    // it once already and never will again.
                    if (m_hashLookup.Contains(leftElement.First)) 
                    {
                        // @

                        m_hashLookup.Remove(leftElement.First); 
                        currentElement = leftElement.First;
#if DEBUG 
                        currentKey = unchecked((int)0xdeadbeef); 
#endif
                        return true; 
                    }
                }

                return false; 
            }
 
            protected override void Dispose(bool disposing) 
            {
                Contract.Assert(m_leftSource != null && m_rightSource != null); 
                m_leftSource.Dispose();
                m_rightSource.Dispose();
            }
        } 

        //---------------------------------------------------------------------------------------- 
        // Returns an enumerable that represents the query executing sequentially. 
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        {
            IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
            IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); 
            return wrappedLeftChild.Intersect(wrappedRightChild, m_comparer);
        } 
 

        class OrderedIntersectQueryOperatorEnumerator : QueryOperatorEnumerator 
        {

            private QueryOperatorEnumerator, TLeftKey> m_leftSource; // Left data source.
            private QueryOperatorEnumerator, int> m_rightSource; // Right data source. 
            private IEqualityComparer> m_comparer; // Comparer to use for equality/hash-coding.
            private IComparer m_leftKeyComparer; // Comparer to use to determine ordering of order keys. 
            private Dictionary, Pair> m_hashLookup; // The hash lookup, used to produce the intersection. 
            private CancellationToken m_cancellationToken;
 
            //----------------------------------------------------------------------------------------
            // Instantiates a new intersection operator.
            //
 
            internal OrderedIntersectQueryOperatorEnumerator(
                QueryOperatorEnumerator, TLeftKey> leftSource, 
                QueryOperatorEnumerator, int> rightSource, 
                IEqualityComparer comparer, IComparer leftKeyComparer,
                CancellationToken cancellationToken) 
            {
                Contract.Assert(leftSource != null);
                Contract.Assert(rightSource != null);
 
                m_leftSource = leftSource;
                m_rightSource = rightSource; 
                m_comparer = new WrapperEqualityComparer(comparer); 
                m_leftKeyComparer = leftKeyComparer;
                m_cancellationToken = cancellationToken; 
            }

            //---------------------------------------------------------------------------------------
            // Walks the two data sources, left and then right, to produce the intersection. 
            //
 
            internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey) 
            {
                Contract.Assert(m_leftSource != null); 
                Contract.Assert(m_rightSource != null);

                // Build the set out of the left data source, if we haven't already.
                int i = 0; 
                if (m_hashLookup == null)
                { 
                    // @ 
                    m_hashLookup = new Dictionary, Pair>(m_comparer);
 
                    Pair leftElement = default(Pair);
                    TLeftKey leftKey = default(TLeftKey);
                    while (m_leftSource.MoveNext(ref leftElement, ref leftKey))
                    { 
                        if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(m_cancellationToken); 
 
                        // For each element, we track the smallest order key for that element that we saw so far
                        Pair oldEntry; 
                        Wrapper wrappedLeftElem = new Wrapper(leftElement.First);

                        // If this is the first occurence of this element, or the order key is lower than all keys we saw previously,
                        // update the order key for this element. 
                        if (!m_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || m_leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
                        { 
                            // For each "elem" value, we store the smallest key, and the element value that had that key. 
                            // Note that even though two element values are "equal" according to the EqualityComparer,
                            // we still cannot choose arbitrarily which of the two to yield. 
                            m_hashLookup[wrappedLeftElem] = new Pair(leftElement.First, leftKey);
                        }
                    }
                } 

                // Now iterate over the right data source, looking for matches. 
                Pair rightElement = default(Pair); 
                int rightKeyUnused = default(int);
                while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) 
                {
                    if ((i++ & CancellationState.POLL_INTERVAL) == 0)
                        CancellationState.ThrowIfCanceled(m_cancellationToken);
 
                    // If we found the element in our set, and if we haven't returned it yet,
                    // we can yield it to the caller. We also mark it so we know we've returned 
                    // it once already and never will again. 

                    Pair entry; 
                    Wrapper wrappedRightElem = new Wrapper(rightElement.First);

                    if (m_hashLookup.TryGetValue(wrappedRightElem, out entry))
                    { 
                        currentElement = entry.First;
                        currentKey = entry.Second; 
 
                        // @
 
                        m_hashLookup.Remove(new Wrapper(entry.First));
                        return true;
                    }
                } 

                return false; 
            } 

            protected override void Dispose(bool disposing) 
            {
                Contract.Assert(m_leftSource != null && m_rightSource != null);
                m_leftSource.Dispose();
                m_rightSource.Dispose(); 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK