BinaryQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / BinaryQueryOperator.cs / 1305376 / BinaryQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// BinaryQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel 
{
    ///  
    /// The base class from which all binary query operators derive, that is, those that
    /// have two child operators. This introduces some convenience methods for those
    /// classes, as well as any state common to all subclasses.
    ///  
    /// 
    ///  
    ///  
    internal abstract class BinaryQueryOperator : QueryOperator
    { 
        // A set of child operators for the current node.
        private readonly QueryOperator m_leftChild;
        private readonly QueryOperator m_rightChild;
        private OrdinalIndexState m_indexState = OrdinalIndexState.Shuffled; 

        //---------------------------------------------------------------------------------------- 
        // Stores a set of child operators on this query node. 
        //
 
        internal BinaryQueryOperator(ParallelQuery leftChild, ParallelQuery rightChild)
            :this(QueryOperator.AsQueryOperator(leftChild), QueryOperator.AsQueryOperator(rightChild))
        {
        } 

        internal BinaryQueryOperator(QueryOperator leftChild, QueryOperator rightChild) 
            : base(false, leftChild.SpecifiedQuerySettings.Merge(rightChild.SpecifiedQuerySettings)) 
        {
            Contract.Assert(leftChild != null && rightChild != null); 
            m_leftChild = leftChild;
            m_rightChild = rightChild;
        }
 
        internal QueryOperator LeftChild
        { 
            get { return m_leftChild; } 
        }
 
        internal QueryOperator RightChild
        {
            get { return m_rightChild; }
        } 

        internal override sealed OrdinalIndexState OrdinalIndexState 
        { 
            get { return m_indexState; }
        } 

        protected void SetOrdinalIndex(OrdinalIndexState indexState)
        {
            m_indexState = indexState; 
        }
 
 
        //---------------------------------------------------------------------------------------
        // This method wraps accepts two child partitioned streams, and constructs an output 
        // partitioned stream. However, instead of returning the transformed partitioned
        // stream, we pass it to a recipient object by calling recipient.Give(..). That
        // way, we can "return" a partitioned stream that uses an order key selected by the operator.
        // 
        public abstract void WrapPartitionedStream(
            PartitionedStream leftPartitionedStream, PartitionedStream rightPartitionedStream, 
            IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings); 

        //--------------------------------------------------------------------------------------- 
        // Implementation of QueryResults for a binary operator. The results will not be indexible
        // unless a derived class provides that functionality.
        //
 
        internal class BinaryQueryOperatorResults : QueryResults
        { 
            protected QueryResults m_leftChildQueryResults; // Results of the left child query 
            protected QueryResults m_rightChildQueryResults; // Results of the right child query
            private BinaryQueryOperator m_op; // Operator that generated these results 
            private QuerySettings m_settings; // Settings collected from the query
            private bool m_preferStriping; // If the results are indexible, should we use striping when partitioning them

            internal BinaryQueryOperatorResults( 
                QueryResults leftChildQueryResults, QueryResults rightChildQueryResults,
                BinaryQueryOperator op, QuerySettings settings, 
                bool preferStriping) 
            {
                m_leftChildQueryResults = leftChildQueryResults; 
                m_rightChildQueryResults = rightChildQueryResults;
                m_op = op;
                m_settings = settings;
                m_preferStriping = preferStriping; 
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) 
            {
                Contract.Assert(IsIndexible == (m_op.OrdinalIndexState == OrdinalIndexState.Indexible)); 

                if (m_settings.ExecutionMode.Value == ParallelExecutionMode.Default && m_op.LimitsParallelism)
                {
                    // We need to run the query sequentially up to and including this operator 
                    IEnumerable opSequential = m_op.AsSequentialQuery(m_settings.CancellationState.ExternalCancellationToken);
                    PartitionedStream result = ExchangeUtilities.PartitionDataSource( 
                        opSequential, m_settings.DegreeOfParallelism.Value, m_preferStriping); 
                    recipient.Receive(result);
                } 
                else if (IsIndexible)
                {
                    // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
                    PartitionedStream result = ExchangeUtilities.PartitionDataSource(this, m_settings.DegreeOfParallelism.Value, m_preferStriping); 
                    recipient.Receive(result);
                } 
                else 
                {
                    // The common case: get partitions from the child and wrap each partition. 
                    m_leftChildQueryResults.GivePartitionedStream(new LeftChildResultsRecipient(recipient, this, m_preferStriping, m_settings));
                }
            }
 
            //---------------------------------------------------------------------------------------
            // LeftChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned 
            // stream from the left child operator, and passes the results along to a 
            // RightChildResultsRecipient.
            // 

            private class LeftChildResultsRecipient : IPartitionedStreamRecipient
            {
                IPartitionedStreamRecipient m_outputRecipient; 
                BinaryQueryOperatorResults m_results;
                bool m_preferStriping; 
                QuerySettings m_settings; 

                internal LeftChildResultsRecipient(IPartitionedStreamRecipient outputRecipient, BinaryQueryOperatorResults results, 
                                                   bool preferStriping, QuerySettings settings)
                {
                    m_outputRecipient = outputRecipient;
                    m_results = results; 
                    m_preferStriping = preferStriping;
                    m_settings = settings; 
                } 

                public void Receive(PartitionedStream source) 
                {
                    RightChildResultsRecipient rightChildRecipient =
                        new RightChildResultsRecipient(m_outputRecipient, m_results.m_op, source, m_preferStriping, m_settings);
                    m_results.m_rightChildQueryResults.GivePartitionedStream(rightChildRecipient); 
                }
            } 
 
            //----------------------------------------------------------------------------------------
            // RightChildResultsRecipient receives a partitioned from the right child operator. Also, 
            // the partitioned stream from the left child operator is passed into the constructor.
            // So, Receive has partitioned streams for both child operators, and also is called in
            // a context where it has access to both TLeftKey and TRightKey. Then, it passes both
            // streams (as arguments) and key types (as type arguments) to the operator's 
            // WrapPartitionedStream method.
            // 
 
            private class RightChildResultsRecipient : IPartitionedStreamRecipient
            { 
                IPartitionedStreamRecipient m_outputRecipient;
                PartitionedStream m_leftPartitionedStream;
                BinaryQueryOperator m_op;
                bool m_preferStriping; 
                QuerySettings m_settings;
 
                internal RightChildResultsRecipient( 
                    IPartitionedStreamRecipient outputRecipient, BinaryQueryOperator op,
                    PartitionedStream leftPartitionedStream, bool preferStriping, QuerySettings settings) 
                {
                    m_outputRecipient = outputRecipient;
                    m_op = op;
                    m_preferStriping = preferStriping; 
                    m_leftPartitionedStream = leftPartitionedStream;
                    m_settings = settings; 
                } 

                public void Receive(PartitionedStream rightPartitionedStream) 
                {
                    m_op.WrapPartitionedStream(m_leftPartitionedStream, rightPartitionedStream, m_outputRecipient, m_preferStriping, m_settings);
                }
            } 

        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// BinaryQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel 
{
    ///  
    /// The base class from which all binary query operators derive, that is, those that
    /// have two child operators. This introduces some convenience methods for those
    /// classes, as well as any state common to all subclasses.
    ///  
    /// 
    ///  
    ///  
    internal abstract class BinaryQueryOperator : QueryOperator
    { 
        // A set of child operators for the current node.
        private readonly QueryOperator m_leftChild;
        private readonly QueryOperator m_rightChild;
        private OrdinalIndexState m_indexState = OrdinalIndexState.Shuffled; 

        //---------------------------------------------------------------------------------------- 
        // Stores a set of child operators on this query node. 
        //
 
        internal BinaryQueryOperator(ParallelQuery leftChild, ParallelQuery rightChild)
            :this(QueryOperator.AsQueryOperator(leftChild), QueryOperator.AsQueryOperator(rightChild))
        {
        } 

        internal BinaryQueryOperator(QueryOperator leftChild, QueryOperator rightChild) 
            : base(false, leftChild.SpecifiedQuerySettings.Merge(rightChild.SpecifiedQuerySettings)) 
        {
            Contract.Assert(leftChild != null && rightChild != null); 
            m_leftChild = leftChild;
            m_rightChild = rightChild;
        }
 
        internal QueryOperator LeftChild
        { 
            get { return m_leftChild; } 
        }
 
        internal QueryOperator RightChild
        {
            get { return m_rightChild; }
        } 

        internal override sealed OrdinalIndexState OrdinalIndexState 
        { 
            get { return m_indexState; }
        } 

        protected void SetOrdinalIndex(OrdinalIndexState indexState)
        {
            m_indexState = indexState; 
        }
 
 
        //---------------------------------------------------------------------------------------
        // This method wraps accepts two child partitioned streams, and constructs an output 
        // partitioned stream. However, instead of returning the transformed partitioned
        // stream, we pass it to a recipient object by calling recipient.Give(..). That
        // way, we can "return" a partitioned stream that uses an order key selected by the operator.
        // 
        public abstract void WrapPartitionedStream(
            PartitionedStream leftPartitionedStream, PartitionedStream rightPartitionedStream, 
            IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings); 

        //--------------------------------------------------------------------------------------- 
        // Implementation of QueryResults for a binary operator. The results will not be indexible
        // unless a derived class provides that functionality.
        //
 
        internal class BinaryQueryOperatorResults : QueryResults
        { 
            protected QueryResults m_leftChildQueryResults; // Results of the left child query 
            protected QueryResults m_rightChildQueryResults; // Results of the right child query
            private BinaryQueryOperator m_op; // Operator that generated these results 
            private QuerySettings m_settings; // Settings collected from the query
            private bool m_preferStriping; // If the results are indexible, should we use striping when partitioning them

            internal BinaryQueryOperatorResults( 
                QueryResults leftChildQueryResults, QueryResults rightChildQueryResults,
                BinaryQueryOperator op, QuerySettings settings, 
                bool preferStriping) 
            {
                m_leftChildQueryResults = leftChildQueryResults; 
                m_rightChildQueryResults = rightChildQueryResults;
                m_op = op;
                m_settings = settings;
                m_preferStriping = preferStriping; 
            }
 
            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) 
            {
                Contract.Assert(IsIndexible == (m_op.OrdinalIndexState == OrdinalIndexState.Indexible)); 

                if (m_settings.ExecutionMode.Value == ParallelExecutionMode.Default && m_op.LimitsParallelism)
                {
                    // We need to run the query sequentially up to and including this operator 
                    IEnumerable opSequential = m_op.AsSequentialQuery(m_settings.CancellationState.ExternalCancellationToken);
                    PartitionedStream result = ExchangeUtilities.PartitionDataSource( 
                        opSequential, m_settings.DegreeOfParallelism.Value, m_preferStriping); 
                    recipient.Receive(result);
                } 
                else if (IsIndexible)
                {
                    // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
                    PartitionedStream result = ExchangeUtilities.PartitionDataSource(this, m_settings.DegreeOfParallelism.Value, m_preferStriping); 
                    recipient.Receive(result);
                } 
                else 
                {
                    // The common case: get partitions from the child and wrap each partition. 
                    m_leftChildQueryResults.GivePartitionedStream(new LeftChildResultsRecipient(recipient, this, m_preferStriping, m_settings));
                }
            }
 
            //---------------------------------------------------------------------------------------
            // LeftChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned 
            // stream from the left child operator, and passes the results along to a 
            // RightChildResultsRecipient.
            // 

            private class LeftChildResultsRecipient : IPartitionedStreamRecipient
            {
                IPartitionedStreamRecipient m_outputRecipient; 
                BinaryQueryOperatorResults m_results;
                bool m_preferStriping; 
                QuerySettings m_settings; 

                internal LeftChildResultsRecipient(IPartitionedStreamRecipient outputRecipient, BinaryQueryOperatorResults results, 
                                                   bool preferStriping, QuerySettings settings)
                {
                    m_outputRecipient = outputRecipient;
                    m_results = results; 
                    m_preferStriping = preferStriping;
                    m_settings = settings; 
                } 

                public void Receive(PartitionedStream source) 
                {
                    RightChildResultsRecipient rightChildRecipient =
                        new RightChildResultsRecipient(m_outputRecipient, m_results.m_op, source, m_preferStriping, m_settings);
                    m_results.m_rightChildQueryResults.GivePartitionedStream(rightChildRecipient); 
                }
            } 
 
            //----------------------------------------------------------------------------------------
            // RightChildResultsRecipient receives a partitioned from the right child operator. Also, 
            // the partitioned stream from the left child operator is passed into the constructor.
            // So, Receive has partitioned streams for both child operators, and also is called in
            // a context where it has access to both TLeftKey and TRightKey. Then, it passes both
            // streams (as arguments) and key types (as type arguments) to the operator's 
            // WrapPartitionedStream method.
            // 
 
            private class RightChildResultsRecipient : IPartitionedStreamRecipient
            { 
                IPartitionedStreamRecipient m_outputRecipient;
                PartitionedStream m_leftPartitionedStream;
                BinaryQueryOperator m_op;
                bool m_preferStriping; 
                QuerySettings m_settings;
 
                internal RightChildResultsRecipient( 
                    IPartitionedStreamRecipient outputRecipient, BinaryQueryOperator op,
                    PartitionedStream leftPartitionedStream, bool preferStriping, QuerySettings settings) 
                {
                    m_outputRecipient = outputRecipient;
                    m_op = op;
                    m_preferStriping = preferStriping; 
                    m_leftPartitionedStream = leftPartitionedStream;
                    m_settings = settings; 
                } 

                public void Receive(PartitionedStream rightPartitionedStream) 
                {
                    m_op.WrapPartitionedStream(m_leftPartitionedStream, rightPartitionedStream, m_outputRecipient, m_preferStriping, m_settings);
                }
            } 

        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK