PartitionerQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / PartitionerQueryOperator.cs / 1305376 / PartitionerQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// PartitionerQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System; 
using System.Collections.Generic;
using System.Linq; 
using System.Text; 
using System.Collections.Concurrent;
using System.Linq.Parallel; 
using System.Diagnostics.Contracts;
using System.Threading;

namespace System.Linq.Parallel 
{
    ///  
    /// A QueryOperator that represents the output of the query partitioner.AsParallel(). 
    /// 
    internal class PartitionerQueryOperator : QueryOperator 
    {
        private Partitioner m_partitioner; // The partitioner to use as data source.

        internal PartitionerQueryOperator(Partitioner partitioner) 
            : base(false, QuerySettings.Empty)
        { 
            m_partitioner = partitioner; 
        }
 
        internal bool Orderable
        {
            get { return m_partitioner is OrderablePartitioner; }
        } 

        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        { 
            // Notice that the preferStriping argument is not used. Partitioner does not support
            // striped partitioning. 

            return new PartitionerQueryOperatorResults(m_partitioner, settings);
        }
 
        //----------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially. 
        // 

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            using (IEnumerator enumerator = m_partitioner.GetPartitions(1)[0])
            {
                while (enumerator.MoveNext()) 
                {
                    yield return enumerator.Current; 
                } 
            }
        } 

        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator.
        // 

        internal override OrdinalIndexState OrdinalIndexState 
        { 
            get { return GetOrdinalIndexState(m_partitioner); }
        } 

        /// 
        /// Determines the OrdinalIndexState for a partitioner
        ///  
        internal static OrdinalIndexState GetOrdinalIndexState(Partitioner partitioner)
        { 
            OrderablePartitioner orderablePartitioner = partitioner as OrderablePartitioner; 

            if (orderablePartitioner == null) 
            {
                return OrdinalIndexState.Shuffled;
            }
 
            if (orderablePartitioner.KeysOrderedInEachPartition)
            { 
                if (orderablePartitioner.KeysNormalized) 
                {
                    return OrdinalIndexState.Correct; 
                }
                else
                {
                    return OrdinalIndexState.Increasing; 
                }
            } 
            else 
            {
                return OrdinalIndexState.Shuffled; 
            }
        }

 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        // 

        internal override bool LimitsParallelism 
        {
            get { return false; }
        }
 

        ///  
        /// QueryResults for a PartitionerQueryOperator 
        /// 
        private class PartitionerQueryOperatorResults : QueryResults 
        {
            private Partitioner m_partitioner; // The data source for the query

            private QuerySettings m_settings; // Settings collected from the query 

            internal PartitionerQueryOperatorResults(Partitioner partitioner, QuerySettings settings) 
            { 
                m_partitioner = partitioner;
                m_settings = settings; 
            }

            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient)
            { 
                Contract.Assert(m_settings.DegreeOfParallelism.HasValue);
                int partitionCount = m_settings.DegreeOfParallelism.Value; 
 
                OrderablePartitioner orderablePartitioner = m_partitioner as OrderablePartitioner;
 
                // If the partitioner is not orderable, it will yield zeros as order keys. The order index state
                // is irrelevant.
                OrdinalIndexState indexState = (orderablePartitioner != null)
                    ? GetOrdinalIndexState(orderablePartitioner) 
                    : OrdinalIndexState.Shuffled;
 
                PartitionedStream partitions = new PartitionedStream( 
                    partitionCount,
                    Util.GetDefaultComparer(), 
                    indexState);

                if (orderablePartitioner != null)
                { 
                    IList>> partitionerPartitions =
                        orderablePartitioner.GetOrderablePartitions(partitionCount); 
 
                    if (partitionerPartitions == null)
                    { 
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList));
                    }

                    if (partitionerPartitions.Count != partitionCount) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); 
                    } 

                    for (int i = 0; i < partitionCount; i++) 
                    {
                        IEnumerator> partition = partitionerPartitions[i];
                        if (partition == null)
                        { 
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition));
                        } 
 
                        partitions[i] = new OrderablePartitionerEnumerator(partition);
                    } 
                }
                else
                {
                    IList> partitionerPartitions = 
                        m_partitioner.GetPartitions(partitionCount);
 
                    if (partitionerPartitions == null) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); 
                    }

                    if (partitionerPartitions.Count != partitionCount)
                    { 
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions));
                    } 
 
                    for (int i = 0; i < partitionCount; i++)
                    { 
                        IEnumerator partition = partitionerPartitions[i];
                        if (partition == null)
                        {
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); 
                        }
 
                        partitions[i] = new PartitionerEnumerator(partition); 
                    }
                } 

                recipient.Receive(partitions);
            }
 
        }
 
        ///  
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner
        /// to a QueryOperatorEnumerator used by PLINQ internally. 
        /// 
        private class OrderablePartitionerEnumerator : QueryOperatorEnumerator
        {
            private IEnumerator> m_sourceEnumerator; 

            internal OrderablePartitionerEnumerator(IEnumerator> sourceEnumerator) 
            { 
                m_sourceEnumerator = sourceEnumerator;
            } 

            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false; 

                KeyValuePair current = m_sourceEnumerator.Current; 
                currentElement = current.Value; 

                checked 
                {
                    currentKey = (int)current.Key;
                }
 
                return true;
            } 
 
            protected override void Dispose(bool disposing)
            { 
                Contract.Assert(m_sourceEnumerator != null);
                m_sourceEnumerator.Dispose();
            }
        } 

        ///  
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner 
        /// to a QueryOperatorEnumerator used by PLINQ internally.
        ///  
        private class PartitionerEnumerator : QueryOperatorEnumerator
        {
            private IEnumerator m_sourceEnumerator;
 
            internal PartitionerEnumerator(IEnumerator sourceEnumerator)
            { 
                m_sourceEnumerator = sourceEnumerator; 
            }
 
            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false;
 
                currentElement = m_sourceEnumerator.Current;
                currentKey = 0; 
 
                return true;
            } 

            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_sourceEnumerator != null); 
                m_sourceEnumerator.Dispose();
            } 
        } 
    }
 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// PartitionerQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System; 
using System.Collections.Generic;
using System.Linq; 
using System.Text; 
using System.Collections.Concurrent;
using System.Linq.Parallel; 
using System.Diagnostics.Contracts;
using System.Threading;

namespace System.Linq.Parallel 
{
    ///  
    /// A QueryOperator that represents the output of the query partitioner.AsParallel(). 
    /// 
    internal class PartitionerQueryOperator : QueryOperator 
    {
        private Partitioner m_partitioner; // The partitioner to use as data source.

        internal PartitionerQueryOperator(Partitioner partitioner) 
            : base(false, QuerySettings.Empty)
        { 
            m_partitioner = partitioner; 
        }
 
        internal bool Orderable
        {
            get { return m_partitioner is OrderablePartitioner; }
        } 

        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        { 
            // Notice that the preferStriping argument is not used. Partitioner does not support
            // striped partitioning. 

            return new PartitionerQueryOperatorResults(m_partitioner, settings);
        }
 
        //----------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially. 
        // 

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            using (IEnumerator enumerator = m_partitioner.GetPartitions(1)[0])
            {
                while (enumerator.MoveNext()) 
                {
                    yield return enumerator.Current; 
                } 
            }
        } 

        //---------------------------------------------------------------------------------------
        // The state of the order index of the results returned by this operator.
        // 

        internal override OrdinalIndexState OrdinalIndexState 
        { 
            get { return GetOrdinalIndexState(m_partitioner); }
        } 

        /// 
        /// Determines the OrdinalIndexState for a partitioner
        ///  
        internal static OrdinalIndexState GetOrdinalIndexState(Partitioner partitioner)
        { 
            OrderablePartitioner orderablePartitioner = partitioner as OrderablePartitioner; 

            if (orderablePartitioner == null) 
            {
                return OrdinalIndexState.Shuffled;
            }
 
            if (orderablePartitioner.KeysOrderedInEachPartition)
            { 
                if (orderablePartitioner.KeysNormalized) 
                {
                    return OrdinalIndexState.Correct; 
                }
                else
                {
                    return OrdinalIndexState.Increasing; 
                }
            } 
            else 
            {
                return OrdinalIndexState.Shuffled; 
            }
        }

 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        // 

        internal override bool LimitsParallelism 
        {
            get { return false; }
        }
 

        ///  
        /// QueryResults for a PartitionerQueryOperator 
        /// 
        private class PartitionerQueryOperatorResults : QueryResults 
        {
            private Partitioner m_partitioner; // The data source for the query

            private QuerySettings m_settings; // Settings collected from the query 

            internal PartitionerQueryOperatorResults(Partitioner partitioner, QuerySettings settings) 
            { 
                m_partitioner = partitioner;
                m_settings = settings; 
            }

            internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient)
            { 
                Contract.Assert(m_settings.DegreeOfParallelism.HasValue);
                int partitionCount = m_settings.DegreeOfParallelism.Value; 
 
                OrderablePartitioner orderablePartitioner = m_partitioner as OrderablePartitioner;
 
                // If the partitioner is not orderable, it will yield zeros as order keys. The order index state
                // is irrelevant.
                OrdinalIndexState indexState = (orderablePartitioner != null)
                    ? GetOrdinalIndexState(orderablePartitioner) 
                    : OrdinalIndexState.Shuffled;
 
                PartitionedStream partitions = new PartitionedStream( 
                    partitionCount,
                    Util.GetDefaultComparer(), 
                    indexState);

                if (orderablePartitioner != null)
                { 
                    IList>> partitionerPartitions =
                        orderablePartitioner.GetOrderablePartitions(partitionCount); 
 
                    if (partitionerPartitions == null)
                    { 
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList));
                    }

                    if (partitionerPartitions.Count != partitionCount) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); 
                    } 

                    for (int i = 0; i < partitionCount; i++) 
                    {
                        IEnumerator> partition = partitionerPartitions[i];
                        if (partition == null)
                        { 
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition));
                        } 
 
                        partitions[i] = new OrderablePartitionerEnumerator(partition);
                    } 
                }
                else
                {
                    IList> partitionerPartitions = 
                        m_partitioner.GetPartitions(partitionCount);
 
                    if (partitionerPartitions == null) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); 
                    }

                    if (partitionerPartitions.Count != partitionCount)
                    { 
                        throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions));
                    } 
 
                    for (int i = 0; i < partitionCount; i++)
                    { 
                        IEnumerator partition = partitionerPartitions[i];
                        if (partition == null)
                        {
                            throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); 
                        }
 
                        partitions[i] = new PartitionerEnumerator(partition); 
                    }
                } 

                recipient.Receive(partitions);
            }
 
        }
 
        ///  
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner
        /// to a QueryOperatorEnumerator used by PLINQ internally. 
        /// 
        private class OrderablePartitionerEnumerator : QueryOperatorEnumerator
        {
            private IEnumerator> m_sourceEnumerator; 

            internal OrderablePartitionerEnumerator(IEnumerator> sourceEnumerator) 
            { 
                m_sourceEnumerator = sourceEnumerator;
            } 

            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false; 

                KeyValuePair current = m_sourceEnumerator.Current; 
                currentElement = current.Value; 

                checked 
                {
                    currentKey = (int)current.Key;
                }
 
                return true;
            } 
 
            protected override void Dispose(bool disposing)
            { 
                Contract.Assert(m_sourceEnumerator != null);
                m_sourceEnumerator.Dispose();
            }
        } 

        ///  
        /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner 
        /// to a QueryOperatorEnumerator used by PLINQ internally.
        ///  
        private class PartitionerEnumerator : QueryOperatorEnumerator
        {
            private IEnumerator m_sourceEnumerator;
 
            internal PartitionerEnumerator(IEnumerator sourceEnumerator)
            { 
                m_sourceEnumerator = sourceEnumerator; 
            }
 
            internal override bool MoveNext(ref TElement currentElement, ref int currentKey)
            {
                if (!m_sourceEnumerator.MoveNext()) return false;
 
                currentElement = m_sourceEnumerator.Current;
                currentKey = 0; 
 
                return true;
            } 

            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_sourceEnumerator != null); 
                m_sourceEnumerator.Dispose();
            } 
        } 
    }
 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK