PartitionerStatic.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Collections / Concurrent / PartitionerStatic.cs / 1305376 / PartitionerStatic.cs

                            #pragma warning disable 0420 
// ==++==
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
// 
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 
// 
// PartitionerStatic.cs
// 
// [....]
//
// A class of default partitioners for Partitioner
// 
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
 
using System.Collections.Generic; 
using System.Security.Permissions;
using System.Threading; 
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;

namespace System.Collections.Concurrent 
{
 
    // The static class Partitioners implements 3 default partitioning strategies: 
    // 1. dynamic load balance partitioning for indexable data source (IList and arrays)
    // 2. static partitioning for indexable data source (IList and arrays) 
    // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
    //    of elements, but enuemrators are not indexable
    // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
    //   We assume that the source data of IList/Array is not changing concurrently. 
    // - data source of type IEnumerable can only be partitioned dynamically (load-balance)
    // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the 
    //   implementation is different for data source of IList/Array vs. IEnumerable: 
    //   * When the source collection is IList/Arrays, we use Interlocked on the shared index;
    //   * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source 
    //     enumerator.

    /// 
    /// Provides common partitioning strategies for arrays, lists, and enumerables. 
    /// 
    ///  
    ///  
    /// The static methods on  are all thread-safe and may be used concurrently
    /// from multiple threads. However, while a created partitioner is in use, the underlying data source 
    /// should not be modified, whether from the same thread that's using a partitioner or from a separate
    /// thread.
    /// 
    ///  
    [HostProtection(Synchronization = true, ExternalThreading = true)]
    public static class Partitioner 
    { 
        /// 
        /// Creates an orderable partitioner from an  
        /// instance.
        /// 
        /// Type of the elements in source list.
        /// The list to be partitioned. 
        /// 
        /// A Boolean value that indicates whether the created partitioner should dynamically 
        /// load balance between partitions rather than statically partition. 
        /// 
        ///  
        /// An orderable partitioner based on the input list.
        /// 
        public static OrderablePartitioner Create(IList list, bool loadBalance)
        { 
            if (list == null)
            { 
                throw new ArgumentNullException("list"); 
            }
            if (loadBalance) 
            {
                return (new DynamicPartitionerForIList(list));
            }
            else 
            {
                return (new StaticIndexRangePartitionerForIList(list)); 
            } 
        }
 
        /// 
        /// Creates an orderable partitioner from a  instance.
        /// 
        /// Type of the elements in source array. 
        /// The array to be partitioned.
        ///  
        /// A Boolean value that indicates whether the created partitioner should dynamically load balance 
        /// between partitions rather than statically partition.
        ///  
        /// 
        /// An orderable partitioner based on the input array.
        /// 
        public static OrderablePartitioner Create(TSource[] array, bool loadBalance) 
        {
            // This implementation uses 'ldelem' instructions for element retrieval, rather than using a 
            // method call. 

            if (array == null) 
            {
                throw new ArgumentNullException("array");
            }
            if (loadBalance) 
            {
                return (new DynamicPartitionerForArray(array)); 
            } 
            else
            { 
                return (new StaticIndexRangePartitionerForArray(array));
            }
        }
 
        /// 
        /// Creates an orderable partitioner from a  instance. 
        ///  
        /// Type of the elements in source enumerable.
        /// The enumerable to be partitioned. 
        /// 
        /// An orderable partitioner based on the input array.
        /// 
        ///  
        /// The ordering used in the created partitioner is determined by the natural order of the elements
        /// as retrieved from the source enumerable. 
        ///  
        public static OrderablePartitioner Create(IEnumerable source)
        { 
            return Create(source, -1);
        }

        // Internal version that allows user to specify the maxChunkSize, rather than using the default. 
        // Used by range partitioning methods to insure that only one range at a time is chunked.
        // A maxChunkSize of -1 means "use the default". 
        internal static OrderablePartitioner Create(IEnumerable source, int maxChunkSize) 
        {
            if (source == null) 
            {
                throw new ArgumentNullException("source");
            }
 
            // Sanity checks.  If and when we make this method public, these should be converted to exceptions.
            Contract.Assert(maxChunkSize != 0, "maxChunkSize specified as 0."); 
            Contract.Assert((maxChunkSize == -1) || (maxChunkSize < (1 << 29)), "maxChunkSize out of range"); 

            return (new DynamicPartitionerForIEnumerable(source, maxChunkSize)); 
        }

#if !PFX_LEGACY_3_5
        /// Creates a partitioner that chunks the user-specified range. 
        /// The lower, inclusive bound of the range.
        /// The upper, exclusive bound of the range. 
        /// A partitioner. 
        ///  The  argument is
 	    /// less than or equal to the  argument. 
        public static OrderablePartitioner> Create(long fromInclusive, long toExclusive)
        {
            // How many chunks do we want to divide the range into?  If this is 1, then the
            // answer is "one chunk per core".  Generally, though, you'll achieve better 
            // load balancing on a busy system if you make it higher than 1.
            int coreOversubscriptionRate = 3; 
 
            if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
            long rangeSize = (toExclusive - fromInclusive) / 
                (Environment.ProcessorCount * coreOversubscriptionRate);
            if (rangeSize == 0) rangeSize = 1;
            return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
        } 

        /// Creates a partitioner that chunks the user-specified range. 
        /// The lower, inclusive bound of the range. 
        /// The upper, exclusive bound of the range.
        /// The size of each subrange. 
        /// A partitioner.
        ///  The  argument is
	    /// less than or equal to the  argument.
        ///  The  argument is 
	    /// less than or equal to 0.
        public static OrderablePartitioner> Create(long fromInclusive, long toExclusive, long rangeSize) 
        { 
            if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
            if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize"); 
            return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
        }

	    // Private method to parcel out range tuples. 
        private static IEnumerable> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
        { 
            // Enumerate all of the ranges 
            long from, to;
            bool shouldQuit = false; 

            for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
            {
                from = i; 
                try { checked { to = i + rangeSize; } }
                catch (OverflowException) 
                { 
                    to = toExclusive;
                    shouldQuit = true; 
                }
                if (to > toExclusive) to = toExclusive;
                yield return new Tuple(from, to);
            } 
        }
 
        /// Creates a partitioner that chunks the user-specified range. 
        /// The lower, inclusive bound of the range.
        /// The upper, exclusive bound of the range. 
        /// A partitioner.
        ///  The  argument is
 	    /// less than or equal to the  argument.
        public static OrderablePartitioner> Create(int fromInclusive, int toExclusive) 
        {
            // How many chunks do we want to divide the range into?  If this is 1, then the 
            // answer is "one chunk per core".  Generally, though, you'll achieve better 
            // load balancing on a busy system if you make it higher than 1.
            int coreOversubscriptionRate = 3; 

            if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
            int rangeSize = (toExclusive - fromInclusive) /
                (Environment.ProcessorCount * coreOversubscriptionRate); 
            if (rangeSize == 0) rangeSize = 1;
            return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time 
        } 

        /// Creates a partitioner that chunks the user-specified range. 
        /// The lower, inclusive bound of the range.
        /// The upper, exclusive bound of the range.
        /// The size of each subrange.
        /// A partitioner. 
        ///  The  argument is
	    /// less than or equal to the  argument. 
        ///  The  argument is 
 	    /// less than or equal to 0.
        public static OrderablePartitioner> Create(int fromInclusive, int toExclusive, int rangeSize) 
        {
            if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
            if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
            return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time 
        }
 
 	    // Private method to parcel out range tuples. 
        private static IEnumerable> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
        { 
            // Enumerate all of the ranges
            int from, to;
            bool shouldQuit = false;
 
            for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
            { 
                from = i; 
                try { checked { to = i + rangeSize; } }
                catch (OverflowException) 
                {
                    to = toExclusive;
                    shouldQuit = true;
                } 
                if (to > toExclusive) to = toExclusive;
                yield return new Tuple(from, to); 
            } 
        }
#endif	 

        #region DynamicPartitionEnumerator_Abstract class
        /// 
        /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance 
        /// partitioning algorithm.
        /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source: 
        ///   the key is the index in the source collection; the value is the item itself. 
        /// - a set of such partitions share a reader over data source. The type of the reader is specified by
        ///   TSourceReader. 
        /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk
        ///   size is initially 1, and doubles every time until it reaches the maximum chunk size.
        ///   The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange
        ///   types (IList and the array), one for data source of IEnumerable. 
        /// - The method "Reset" is not supported for any partitioning algorithm.
        /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it 
        ///   in this abstract class. 
        /// 
        /// Type of the elements in the data source 
        /// Type of the reader on the data source
        //TSourceReader is
        //  - IList, when source data is IList, the shared reader is source data itself
        //  - TSource[], when source data is TSource[], the shared reader is source data itself 
        //  - IEnumerator, when source data is IEnumerable, and the shared reader is an
        //    enumerator of the source data 
        private abstract class DynamicPartitionEnumerator_Abstract : IEnumerator> 
        {
            //----------------- common fields and constructor for all dynamic partitioners ----------------- 
            //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
            protected readonly TSourceReader m_sharedReader;

            protected static int s_defaultMaxChunkSize = GetDefaultChunkSize(); 

            //deferred allocating in MoveNext() with initial value 0, to avoid false sharing 
            //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator 
            protected Shared m_currentChunkSize;
 
            //deferring allocation in MoveNext() with initial value -1, to avoid false sharing
            protected Shared m_localOffset;

            private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs 
            private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
            protected readonly int m_maxChunkSize; // Max chunk size specified by caller, or s_defaultMaxChunkSize 
 
            // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
            // it serves as tracking of the natual order of elements in m_sharedReader 
            // the value of this field is passed in from outside (already initialized) by the constructor,
            protected readonly Shared m_sharedIndex;

            protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex) 
                : this(sharedReader, sharedIndex, -1)
            { 
            } 

            protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex, int maxChunkSize) 
            {
                Contract.Assert((maxChunkSize == -1) || (maxChunkSize > 0), "maxChunkSize 0 or < -1");

                m_sharedReader = sharedReader; 
                m_sharedIndex = sharedIndex;
                if (maxChunkSize == -1) m_maxChunkSize = s_defaultMaxChunkSize; 
                else m_maxChunkSize = maxChunkSize; 
            }
 
            // ---------------- abstract method declarations --------------

            /// 
            /// Abstract method to request a contiguous chunk of elements from the source collection 
            /// 
            /// specified number of elements requested 
            ///  
            /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
            /// false if all elements in the source collection have been reserved. 
            /// 
            //GrabNextChunk does the following:
            //  - grab # of requestedChunkSize elements from source data through shared reader,
            //  - at the time of function returns, m_currentChunkSize is updated with the number of 
            //    elements actually got ----gined (<=requestedChunkSize).
            //  - GrabNextChunk returns true if at least one element is assigned to this partition; 
            //    false if the shared reader already hits the last element of the source data before 
            //    we call GrabNextChunk
            protected abstract bool GrabNextChunk(int requestedChunkSize); 

            /// 
            /// Abstract property, returns whether or not the shared reader has already read the last
            /// element of the source data 
            /// 
            protected abstract bool HasNoElementsLeft { get; set; } 
 
            /// 
            /// Get the current element in the current partition. Property required by IEnumerator interface 
            /// This property is abstract because the implementation is different depending on the type
            /// of the source data: IList, Array or IEnumerable
            /// 
            public abstract KeyValuePair Current { get; } 

            ///  
            /// Dispose is abstract, and depends on the type of the source data: 
            /// - For source data type IList and Array, the type of the shared reader is just the dataitself.
            ///   We don't do anything in Dispose method for IList and Array. 
            /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
            ///   Thus we need to dispose this shared reader enumerator, when there is no more active partitions
            ///   left.
            ///  
            public abstract void Dispose();
 
            ///  
            /// Reset on partitions is not supported
            ///  
            public void Reset()
            {
                throw new NotSupportedException();
            } 

 
            ///  
            /// Get the current element in the current partition. Property required by IEnumerator interface
            ///  
            Object IEnumerator.Current
            {
                get
                { 
                    return ((DynamicPartitionEnumerator_Abstract)this).Current;
                } 
            } 

            ///  
            /// Moves to the next element if any.
            /// Try current chunk first, if the current chunk do not have any elements left, then we
            /// attempt to grab a chunk from the source collection.
            ///  
            /// 
            /// true if successfully moving to the next position; 
            /// false otherwise, if and only if there is no more elements left in the current chunk 
            /// AND the source collection is exhausted.
            ///  
            public bool MoveNext()
            {
                //perform deferred allocating of the local variables.
                if (m_localOffset == null) 
                {
                    Contract.Assert(m_currentChunkSize == null); 
                    m_localOffset = new Shared(-1); 
                    m_currentChunkSize = new Shared(0);
                    m_doublingCountdown = CHUNK_DOUBLING_RATE; 
                }

                if (m_localOffset.Value < m_currentChunkSize.Value - 1)
                //attempt to grab the next element from the local chunk 
                {
                    m_localOffset.Value++; 
                    return true; 
                }
                else 
                //otherwise it means we exhausted the local chunk
                //grab a new chunk from the source enumerator
                {
                    Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1); 
                    //set the requested chunk size to a proper value
                    int requestedChunkSize; 
                    if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator 
                    {
                        requestedChunkSize = 1; 
                    }
                    else if (m_doublingCountdown > 0)
                    {
                        requestedChunkSize = m_currentChunkSize.Value; 
                    }
                    else 
                    { 
                        requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
                        m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset 
                    }

                    // Decrement your doubling countdown
                    m_doublingCountdown--; 

                    Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize); 
                    //GrabNextChunk will update the value of m_currentChunkSize 
                    if (GrabNextChunk(requestedChunkSize))
                    { 
                        Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
                        m_localOffset.Value = 0;
                        return true;
                    } 
                    else
                    { 
                        return false; 
                    }
                } 
            }
        }
        #endregion
 
        #region Dynamic Partitioner for source data of IEnuemrable<> type
        ///  
        /// Inherits from DynamicPartitioners 
        /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
        /// of EnumerableOfPartitionsForIEnumerator defined internally 
        /// 
        /// Type of elements in the source data
        private class DynamicPartitionerForIEnumerable : OrderablePartitioner
        { 
            IEnumerable m_source;
            int m_maxChunkSize; // a value of -1 means "use default" 
 
            //constructor
            internal DynamicPartitionerForIEnumerable(IEnumerable source, int maxChunkSize) 
                : base(true, false, true)
            {
                m_source = source;
                m_maxChunkSize = maxChunkSize; 
            }
 
            ///  
            /// Overrides OrderablePartitioner.GetOrderablePartitions.
            /// Partitions the underlying collection into the given number of orderable partitions. 
            /// 
            /// number of partitions requested
            /// A list containing  enumerators.
            override public IList>> GetOrderablePartitions(int partitionCount) 
            {
                if (partitionCount <= 0) 
                { 
                    throw new ArgumentOutOfRangeException("partitionCount");
                } 
                IEnumerator>[] partitions
                    = new IEnumerator>[partitionCount];

                IEnumerable> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize); 
                for (int i = 0; i < partitionCount; i++)
                { 
                    partitions[i] = partitionEnumerable.GetEnumerator(); 
                }
                return partitions; 
            }

            /// 
            /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions 
            /// 
            /// a enumerable collection of orderable partitions 
            override public IEnumerable> GetOrderableDynamicPartitions() 
            {
                return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize); 
            }

            /// 
            /// Whether additional partitions can be created dynamically. 
            /// 
            override public bool SupportsDynamicPartitions 
            { 
                get { return true; }
            } 

            #region Internal classes:  InternalPartitionEnumerable, InternalPartitionEnumerator
            /// 
            /// Provides customized implementation for source data of IEnumerable 
            /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
            /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a 
            /// shared count "m_activePartitionCount" 
            /// 
            private class InternalPartitionEnumerable : IEnumerable>, IDisposable 
            {
                //reader through which we access the source data
                private readonly IEnumerator m_sharedReader;
                private Shared m_sharedIndex;//initial value -1 

                //fields shared by all partitions that this Enumerable owns 
                private Shared m_hasNoElementsLeft;//deferring allocation by enumerator 

                //shared synchronization lock, created by this Enumerable 
                private object m_sharedLock;//deferring allocation by enumerator

                private bool m_disposed;
 
                private Shared m_activePartitionCount;
 
                private readonly int m_maxChunkSize; 

                internal InternalPartitionEnumerable(IEnumerator sharedReader, int maxChunkSize) 
                {
                    m_sharedReader = sharedReader;
                    m_sharedIndex = new Shared(-1);
                    m_hasNoElementsLeft = new Shared(false); 
                    m_sharedLock = new object();
                    m_activePartitionCount = new Shared(0); 
                    m_maxChunkSize = maxChunkSize; 
                }
 
                public IEnumerator> GetEnumerator()
                {
                    if (m_disposed)
                    { 
                        throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
                    } 
                    else 
                    {
                        return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex, 
                            m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_maxChunkSize);
                    }
                }
 
                IEnumerator IEnumerable.GetEnumerator()
                { 
                    return ((InternalPartitionEnumerable)this).GetEnumerator(); 
                }
 
                public void Dispose()
                {
                    if (!m_disposed)
                    { 
                        m_disposed = true;
                        m_sharedReader.Dispose(); 
                    } 
                }
            } 

            /// 
            /// Inherits from DynamicPartitionEnumerator_Abstract directly
            /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose 
            /// 
            private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract> 
            { 
                //---- fields ----
                //cached local copy of the current chunk 
                private KeyValuePair[] m_localList; //defer allocating to avoid false sharing

                // the values of the following two fields are passed in from
                // outside(already initialized) by the constructor, 
                private readonly Shared m_hasNoElementsLeft;
                private readonly object m_sharedLock; 
                private readonly Shared m_activePartitionCount; 
                private InternalPartitionEnumerable m_enumerable;
                //constructor 
                internal InternalPartitionEnumerator(
                    IEnumerator sharedReader,
                    Shared sharedIndex,
                    Shared hasNoElementsLeft, 
                    object sharedLock,
                    Shared activePartitionCount, 
                    InternalPartitionEnumerable enumerable, 
                    int maxChunkSize)
                    : base(sharedReader, sharedIndex, maxChunkSize) 
                {
                    m_hasNoElementsLeft = hasNoElementsLeft;
                    m_sharedLock = sharedLock;
                    m_enumerable = enumerable; 
                    m_activePartitionCount = activePartitionCount;
                    Interlocked.Increment(ref m_activePartitionCount.Value); 
                } 

                //overriding methods 

                /// 
                /// Reserves a contiguous range of elements from source data
                ///  
                /// specified number of elements requested
                ///  
                /// true if we successfully reserved at least one element (up to #=requestedChunkSize) 
                /// false if all elements in the source collection have been reserved.
                ///  
                override protected bool GrabNextChunk(int requestedChunkSize)
                {
                    Contract.Assert(requestedChunkSize > 0);
 
                    if (HasNoElementsLeft)
                    { 
                        return false; 
                    }
                    else 
                    {
                        lock (m_sharedLock)
                        {
                            if (HasNoElementsLeft) 
                            {
                                return false; 
                            } 
                            else
                            { 
                                try
                                {
                                    int actualChunkSize;
                                    //enumerate over source data until either we got #requestedChunkSize of elements or 
                                    //MoveNext returns false
                                    for (actualChunkSize = 0; actualChunkSize < requestedChunkSize; actualChunkSize++) 
                                    { 
                                        if (m_sharedReader.MoveNext())
                                        { 
                                            //defer allocating to avoid false sharing
                                            if (m_localList == null)
                                            {
                                                m_localList = new KeyValuePair[m_maxChunkSize]; 
                                            }
                                            Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk 
                                            m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); 
                                            m_localList[actualChunkSize]
                                                = new KeyValuePair(m_sharedIndex.Value, 
                                                                                  m_sharedReader.Current);
                                        }
                                        else
                                        { 
                                            //if MoveNext() return false, we set the flag to inform other partitions
                                            HasNoElementsLeft = true; 
                                            break; 
                                        }
                                    } 
                                    if (actualChunkSize > 0)
                                    {
                                        m_currentChunkSize.Value = actualChunkSize;
                                        return true; 
                                    }
                                    else 
                                    { 
                                        return false;
                                    } 
                                }
                                catch
                                {
                                    // If an exception occurs, don't let the other enumerators try to enumerate. 
                                    // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected
                                    //  and not helpful to the end user.  We know the root cause is being communicated already.) 
                                    HasNoElementsLeft = true; 
                                    throw;
                                } 
                            }
                        }
                    }
                } 

                ///  
                /// Returns whether or not the shared reader has already read the last 
                /// element of the source data
                ///  
                /// 
                /// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
                /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared
                /// boolean value m_hasNoElementsLeft across all partitions 
                /// 
                override protected bool HasNoElementsLeft 
                { 
                    get { return m_hasNoElementsLeft.Value; }
                    set 
                    {
                        //we only set it from false to true once
                        //we should never set it back in any circumstances
                        Contract.Assert(value); 
                        Contract.Assert(!m_hasNoElementsLeft.Value);
                        m_hasNoElementsLeft.Value = true; 
                    } 
                }
 
                override public KeyValuePair Current
                {
                    get
                    { 
                        //verify that MoveNext is at least called once before Current is called
                        if (m_currentChunkSize == null) 
                        { 
                            throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
                        } 
                        Contract.Assert(m_localList != null);
                        Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
                        return (m_localList[m_localOffset.Value]);
                    } 
                }
 
                ///  
                /// If the current partition is to be disposed, we decrement the number of active partitions
                /// for the shared reader. 
                /// If the number of active partitions becomes 0, we need to dispose the shared reader we created
                /// 
                override public void Dispose()
                { 
                    if (Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
                    { 
                        m_enumerable.Dispose(); 
                    }
                } 
            }
            #endregion

        } 
        #endregion
 
        #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>) 
        /// 
        /// Dynamic load-balance partitioner. This class is abstract and to be derived from by 
        /// the customized partitioner classes for IList, Array, and IEnumerable
        /// 
        /// Type of the elements in the source data
        ///  Type of the source data collection 
        private abstract class DynamicPartitionerForIndexRange_Abstract : OrderablePartitioner
        { 
            // TCollection can be: IList, TSource[] and IEnumerable 
            // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
            TCollection m_data; 

            /// 
            /// Constructs a new orderable partitioner
            ///  
            /// source data collection
            protected DynamicPartitionerForIndexRange_Abstract(TCollection data) 
                : base(true, false, true) 
            {
                m_data = data; 
            }

            /// 
            /// Partition the source data and create an enumerable over the resulting partitions. 
            /// 
            /// the source data collection 
            /// an enumerable of partitions of  
            protected abstract IEnumerable> GetOrderableDynamicPartitions_Factory(TCollection data);
 
            /// 
            /// Overrides OrderablePartitioner.GetOrderablePartitions.
            /// Partitions the underlying collection into the given number of orderable partitions.
            ///  
            /// number of partitions requested
            /// A list containing  enumerators. 
            override public IList>> GetOrderablePartitions(int partitionCount) 
            {
                if (partitionCount <= 0) 
                {
                    throw new ArgumentOutOfRangeException("partitionCount");
                }
                IEnumerator>[] partitions 
                    = new IEnumerator>[partitionCount];
                IEnumerable> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data); 
                for (int i = 0; i < partitionCount; i++) 
                {
                    partitions[i] = partitionEnumerable.GetEnumerator(); 
                }
                return partitions;
            }
 
            /// 
            /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions 
            ///  
            /// a enumerable collection of orderable partitions
            override public IEnumerable> GetOrderableDynamicPartitions() 
            {
                return GetOrderableDynamicPartitions_Factory(m_data);
            }
 
            /// 
            /// Whether additional partitions can be created dynamically. 
            ///  
            override public bool SupportsDynamicPartitions
            { 
                get { return true; }
            }

        } 

        ///  
        /// Defines dynamic partition for source data of IList and Array. 
        /// This class inherits DynamicPartitionEnumerator_Abstract
        ///   - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array 
        ///   - Current property still remains abstract, implementation is different for IList and Array
        ///   - introduces another abstract method SourceCount, which returns the number of elements in
        ///     the source data. Implementation differs for IList and Array
        ///  
        /// Type of the elements in the data source
        /// Type of the reader on the source data 
        private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract : DynamicPartitionEnumerator_Abstract 
        {
            //fields 
            protected int m_startIndex; //initially zero

            //constructor
            protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, Shared sharedIndex) 
                : base(sharedReader, sharedIndex)
            { 
            } 

            //abstract methods 
            //the Current property is still abstract, and will be implemented by derived classes
            //we add another abstract method SourceCount to get the number of elements from the source reader

            ///  
            /// Get the number of elements from the source reader.
            /// It calls IList.Count or Array.Length 
            ///  
            protected abstract int SourceCount { get; }
 
            //overriding methods

            /// 
            /// Reserves a contiguous range of elements from source data 
            /// 
            /// specified number of elements requested 
            ///  
            /// true if we successfully reserved at least one element (up to #=requestedChunkSize)
            /// false if all elements in the source collection have been reserved. 
            /// 
            override protected bool GrabNextChunk(int requestedChunkSize)
            {
                Contract.Assert(requestedChunkSize > 0); 

                while (!HasNoElementsLeft) 
                { 
                    Contract.Assert(m_sharedIndex != null);
                    long oldSharedIndex = m_sharedIndex.Value; 

                    if (HasNoElementsLeft)
                    {
                        //HasNoElementsLeft situation changed from false to true immediately 
                        //and oldSharedIndex becomes stale
                        return false; 
                    } 

                    //there won't be overflow, because the index of IList/array is int, and we 
                    //have casted it to long.
                    long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);

 
                    //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex]
                    //inclusive in the source collection 
                    if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex) 
                        == oldSharedIndex)
                    { 
                        //set up local indexes.
                        //m_currentChunkSize is always set to requestedChunkSize when source data had
                        //enough elements of what we requested
                        m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex); 
                        m_localOffset.Value = -1;
                        m_startIndex = (int)(oldSharedIndex + 1); 
                        return true; 
                    }
                } 
                //didn't get any element, return false;
                return false;
            }
 
            /// 
            /// Returns whether or not the shared reader has already read the last 
            /// element of the source data 
            /// 
            override protected bool HasNoElementsLeft 
            {
                get
                {
                    Contract.Assert(m_sharedIndex != null); 
                    return m_sharedIndex.Value >= SourceCount - 1;
                } 
                set 
                {
                    Contract.Assert(false); 
                }
            }

            ///  
            /// For source data type IList and Array, the type of the shared reader is just the data itself.
            /// We don't do anything in Dispose method for IList and Array. 
            ///  
            override public void Dispose()
            { } 
        }


        ///  
        /// Inherits from DynamicPartitioners
        /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance 
        /// of EnumerableOfPartitionsForIList defined internally 
        /// 
        /// Type of elements in the source data 
        private class DynamicPartitionerForIList : DynamicPartitionerForIndexRange_Abstract>
        {
            //constructor
            internal DynamicPartitionerForIList(IList source) 
                : base(source)
            { } 
 
            //override methods
            override protected IEnumerable> GetOrderableDynamicPartitions_Factory(IList m_data) 
            {
                //m_data itself serves as shared reader
                return new InternalPartitionEnumerable(m_data);
            } 

            ///  
            /// Inherits from PartitionList_Abstract 
            /// Provides customized implementation for source data of IList
            ///  
            private class InternalPartitionEnumerable : IEnumerable>
            {
                //reader through which we access the source data
                private readonly IList m_sharedReader; 
                private Shared m_sharedIndex;
 
                internal InternalPartitionEnumerable(IList sharedReader) 
                {
                    m_sharedReader = sharedReader; 
                    m_sharedIndex = new Shared(-1);
                }

                public IEnumerator> GetEnumerator() 
                {
                    return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); 
                } 

                IEnumerator IEnumerable.GetEnumerator() 
                {
                    return ((InternalPartitionEnumerable)this).GetEnumerator();
                }
            } 

            ///  
            /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract 
            /// Provides customized implementation of SourceCount property and Current property for IList
            ///  
            private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract>
            {
                //constructor
                internal InternalPartitionEnumerator(IList sharedReader, Shared sharedIndex) 
                    : base(sharedReader, sharedIndex)
                { } 
 
                //overriding methods
                override protected int SourceCount 
                {
                    get { return m_sharedReader.Count; }
                }
                ///  
                /// return a KeyValuePair of the current element and its key
                ///  
                override public KeyValuePair Current 
                {
                    get 
                    {
                        //verify that MoveNext is at least called once before Current is called
                        if (m_currentChunkSize == null)
                        { 
                            throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
                        } 
 
                        Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
                        return new KeyValuePair(m_startIndex + m_localOffset.Value, 
                            m_sharedReader[m_startIndex + m_localOffset.Value]);
                    }
                }
            } 
        }
 
 

        ///  
        /// Inherits from DynamicPartitioners
        /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
        /// of EnumerableOfPartitionsForArray defined internally
        ///  
        /// Type of elements in the source data
        private class DynamicPartitionerForArray : DynamicPartitionerForIndexRange_Abstract 
        { 
            //constructor
            internal DynamicPartitionerForArray(TSource[] source) 
                : base(source)
            { }

            //override methods 
            override protected IEnumerable> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
            { 
                return new InternalPartitionEnumerable(m_data); 
            }
 
            /// 
            /// Inherits from PartitionList_Abstract
            /// Provides customized implementation for source data of Array
            ///  
            private class InternalPartitionEnumerable : IEnumerable>
            { 
                //reader through which we access the source data 
                private readonly TSource[] m_sharedReader;
                private Shared m_sharedIndex; 

                internal InternalPartitionEnumerable(TSource[] sharedReader)
                {
                    m_sharedReader = sharedReader; 
                    m_sharedIndex = new Shared(-1);
                } 
 
                IEnumerator IEnumerable.GetEnumerator()
                { 
                    return ((InternalPartitionEnumerable)this).GetEnumerator();
                }

 
                public IEnumerator> GetEnumerator()
                { 
                    return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); 
                }
            } 

            /// 
            /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
            /// Provides customized implementation of SourceCount property and Current property for Array 
            /// 
            private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract 
            { 
                //constructor
                internal InternalPartitionEnumerator(TSource[] sharedReader, Shared sharedIndex) 
                    : base(sharedReader, sharedIndex)
                { }

                //overriding methods 
                override protected int SourceCount
                { 
                    get { return m_sharedReader.Length; } 
                }
 
                override public KeyValuePair Current
                {
                    get
                    { 
                        //verify that MoveNext is at least called once before Current is called
                        if (m_currentChunkSize == null) 
                        { 
                            throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
                        } 

                        Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
                        return new KeyValuePair(m_startIndex + m_localOffset.Value,
                            m_sharedReader[m_startIndex + m_localOffset.Value]); 
                    }
                } 
            } 
        }
        #endregion 


        #region Static partitioning for IList and Array, abstract classes
        ///  
        /// Static partitioning over IList.
        /// - dynamic and load-balance 
        /// - Keys are ordered within each partition 
        /// - Keys are ordered across partitions
        /// - Keys are normalized 
        /// - Number of partitions is fixed once specified, and the elements of the source data are
        /// distributed to each partition as evenly as possible.
        /// 
        /// type of the elements 
        /// Type of the source data collection
        private abstract class StaticIndexRangePartitioner : OrderablePartitioner 
        { 
            protected StaticIndexRangePartitioner()
                : base(true, true, true) 
            { }

            /// 
            /// Abstract method to return the number of elements in the source data 
            /// 
            protected abstract int SourceCount { get; } 
 
            /// 
            /// Abstract method to create a partition that covers a range over source data, 
            /// starting from "startIndex", ending at "endIndex"
            /// 
            /// start index of the current partition on the source data
            /// end index of the current partition on the source data 
            /// a partition enumerator over the specified range
            // The partitioning algorithm is implemented in GetOrderablePartitions method 
            // This method delegates according to source data type IList/Array 
            protected abstract IEnumerator> CreatePartition(int startIndex, int endIndex);
 
            /// 
            /// Overrides OrderablePartitioner.GetOrderablePartitions
            /// Return a list of partitions, each of which enumerate a fixed part of the source data
            /// The elements of the source data are distributed to each partition as evenly as possible. 
            /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b,
            /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements, 
            /// and the last x-b partitions each has a elements. 
            /// For example, if N=10, x =3, then
            ///    partition 0 ranges [0,3], 
            ///    partition 1 ranges [4,6],
            ///    partition 2 ranges [7,9].
            /// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators.
            /// An empty enumerator is indicated by 
            ///      (m_startIndex == list.Count && m_endIndex == list.Count -1)
            ///  
            /// specified number of partitions 
            /// a list of partitions
            override public IList>> GetOrderablePartitions(int partitionCount) 
            {
                if (partitionCount <= 0)
                {
                    throw new ArgumentOutOfRangeException("partitionCount"); 
                }
 
                int quotient, remainder; 
                quotient = Math.DivRem(SourceCount, partitionCount, out remainder);
 
                IEnumerator>[] partitions = new IEnumerator>[partitionCount];
                int lastEndIndex = -1;
                for (int i = 0; i < partitionCount; i++)
                { 
                    int startIndex = lastEndIndex + 1;
 
                    if (i < remainder) 
                    {
                        lastEndIndex = startIndex + quotient; 
                    }
                    else
                    {
                        lastEndIndex = startIndex + quotient - 1; 
                    }
                    partitions[i] = CreatePartition(startIndex, lastEndIndex); 
                } 
                return partitions;
            } 
        }

        /// 
        /// Static Partition for IList/Array. 
        /// This class implements all methods required by IEnumerator interface, except for the Current property.
        /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element 
        /// retrieval. 
        /// 
        //We assume the source collection is not being updated concurrently. Otherwise it will break the 
        //static partitioning, since each partition operates on the source collection directly, it does
        //not have a local cache of the elements assigned to them.
        private abstract class StaticIndexRangePartition : IEnumerator>
        { 
            //the start and end position in the source collection for the current partition
            //the partition is empty if and only if 
            // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1) 
            protected readonly int m_startIndex;
            protected readonly int m_endIndex; 

            //the current index of the current partition while enumerating on the source collection
            protected volatile int m_offset;
 
            /// 
            /// Constructs an instance of StaticIndexRangePartition 
            ///  
            /// the start index in the source collection for the current partition 
            /// the end index in the source collection for the current partition 
            protected StaticIndexRangePartition(int startIndex, int endIndex)
            {
                m_startIndex = startIndex;
                m_endIndex = endIndex; 
                m_offset = startIndex - 1;
            } 
 
            /// 
            /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster 
            /// element retrieval.
            /// 
            public abstract KeyValuePair Current { get; }
 
            /// 
            /// We don't dispose the source for IList and array 
            ///  
            public void Dispose()
            { } 

            public void Reset()
            {
                throw new NotSupportedException(); 
            }
 
            ///  
            /// Moves to the next item
            /// Before the first MoveNext is called: m_offset == m_startIndex-1; 
            /// 
            /// true if successful, false if there is no item left
            public bool MoveNext()
            { 
                if (m_offset < m_endIndex)
                { 
                    m_offset++; 
                    return true;
                } 
                else
                {
                    //After we have enumerated over all elements, we set m_offset to m_endIndex +1.
                    //The reason we do this is, for an empty enumerator, we need to tell the Current 
                    //property whether MoveNext has been called or not.
                    //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex), 
                    //and we don't set a new value to m_offset, then the above condition will always be 
                    //true, and the Current property will mistakenly assume MoveNext is never called.
                    m_offset = m_endIndex + 1; 
                    return false;
                }
            }
 
            Object IEnumerator.Current
            { 
                get 
                {
                    return ((StaticIndexRangePartition)this).Current; 
                }
            }
        }
        #endregion 

        #region Static partitioning for IList 
        ///  
        /// Inherits from StaticIndexRangePartitioner
        /// Provides customized implementation of SourceCount and CreatePartition 
        /// 
        /// 
        private class StaticIndexRangePartitionerForIList : StaticIndexRangePartitioner>
        { 
            IList m_list;
            internal StaticIndexRangePartitionerForIList(IList list) 
                : base() 
            {
                Contract.Assert(list != null); 
                m_list = list;
            }
            override protected int SourceCount
            { 
                get { return m_list.Count; }
            } 
            override protected IEnumerator> CreatePartition(int startIndex, int endIndex) 
            {
                return new StaticIndexRangePartitionForIList(m_list, startIndex, endIndex); 
            }
        }

        ///  
        /// Inherits from StaticIndexRangePartition
        /// Provides customized implementation of Current property 
        ///  
        /// 
        private class StaticIndexRangePartitionForIList : StaticIndexRangePartition 
        {
            //the source collection shared by all partitions
            private volatile IList m_list;
 
            internal StaticIndexRangePartitionForIList(IList list, int startIndex, int endIndex)
                : base(startIndex, endIndex) 
            { 
                Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1);
                m_list = list; 
            }

            override public KeyValuePair Current
            { 
                get
                { 
                    //verify that MoveNext is at least called once before Current is called 
                    if (m_offset < m_startIndex)
                    { 
                        throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
                    }

                    Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); 
                    return (new KeyValuePair(m_offset, m_list[m_offset]));
                } 
            } 
        }
        #endregion 

        #region static partitioning for Arrays
        /// 
        /// Inherits from StaticIndexRangePartitioner 
        /// Provides customized implementation of SourceCount and CreatePartition for Array
        ///  
        private class StaticIndexRangePartitionerForArray : StaticIndexRangePartitioner 
        {
            TSource[] m_array; 
            internal StaticIndexRangePartitionerForArray(TSource[] array)
                : base()
            {
                Contract.Assert(array != null); 
                m_array = array;
            } 
            override protected int SourceCount 
            {
                get { return m_array.Length; } 
            }
            override protected IEnumerator> CreatePartition(int startIndex, int endIndex)
            {
                return new StaticIndexRangePartitionForArray(m_array, startIndex, endIndex); 
            }
        } 
 
        /// 
        /// Inherits from StaticIndexRangePartitioner 
        /// Provides customized implementation of SourceCount and CreatePartition
        /// 
        private class StaticIndexRangePartitionForArray : StaticIndexRangePartition
        { 
            //the source collection shared by all partitions
            private volatile TSource[] m_array; 
 
            internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex)
                : base(startIndex, endIndex) 
            {
                Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1);
                m_array = array;
            } 

            override public KeyValuePair Current 
            { 
                get
                { 
                    //verify that MoveNext is at least called once before Current is called
                    if (m_offset < m_startIndex)
                    {
                        throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); 
                    }
 
                    Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); 
                    return (new KeyValuePair(m_offset, m_array[m_offset]));
                } 
            }
        }
        #endregion
 

        #region Utility functions 
        ///  
        /// A very simple primitive that allows us to share a value across multiple threads.
        ///  
        /// 
        private class Shared
        {
            internal TSource Value; 

            internal Shared(TSource value) 
            { 
                this.Value = value;
            } 

        }

        //-------------------- 
        // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling,
        // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling 
        //-------------------- 

        // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache 
        // lines worth, assuming 128b cache line.  Most (popular) architectures use 64b cache lines,
        // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient
        // for 128b cache systems.  So 128b it is.
        private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4; 

        private static int GetDefaultChunkSize() 
        { 
            int chunkSize;
 
            if (typeof(TSource).IsValueType)
            {
                // @
 
                if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit)
                { 
                    chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource))); 
                }
                else 
                {
                    // We choose '128' because this ensures, no matter the actual size of the value type,
                    // the total bytes used will be a multiple of 128. This ensures it's cache aligned.
                    chunkSize = 128; 
                }
            } 
            else 
            {
                Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size"); 
                chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size);
            }
            return chunkSize;
        } 
        #endregion
 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK