HashRepartitionEnumerator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Partitioning / HashRepartitionEnumerator.cs / 1305376 / HashRepartitionEnumerator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// HashRepartitionEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// This enumerator handles the actual coordination among partitions required to
    /// accomplish the repartitioning operation, as explained above.
    ///  
    /// The kind of elements.
    /// The key used to distribute elements. 
    /// The kind of keys found in the source (ignored). 
    internal class HashRepartitionEnumerator : QueryOperatorEnumerator, int>
 
    {
        private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.

        private readonly int m_partitionCount; // The number of partitions. 
        private readonly int m_partitionIndex; // Our unique partition index.
        private readonly Func m_keySelector; // A key-selector function. 
        private readonly HashRepartitionStream m_repartitionStream; // A repartitioning stream. 
        private readonly ListChunk>[,] m_valueExchangeMatrix; // Matrix to do inter-task communication.
        private readonly QueryOperatorEnumerator m_source; // The immediate source of data. 
        private CountdownEvent m_barrier; // Used to signal and wait for repartitions to complete.
        private readonly CancellationToken m_cancellationToken; // A token for canceling the process.
        private Mutables m_mutables; // Mutable fields for this enumerator.
 
        class Mutables
        { 
            internal int m_currentBufferIndex; // Current buffer index. 
            internal ListChunk> m_currentBuffer; // The buffer we're currently enumerating.
            internal int m_currentIndex; // Current index into the buffer. 

            internal Mutables()
            {
                m_currentBufferIndex = ENUMERATION_NOT_STARTED; 
            }
        } 
 
        //----------------------------------------------------------------------------------------
        // Creates a new repartitioning enumerator. 
        //
        // Arguments:
        //     source            - the data stream from which to pull elements
        //     useOrdinalOrderPreservation - whether order preservation is required 
        //     partitionCount    - total number of partitions
        //     partitionIndex    - this operator's unique partition index 
        //     repartitionStream - the stream object to use for partition selection 
        //     barrier           - a latch used to signal task completion
        //     buffers           - a set of buffers for inter-task communication 
        //

        internal HashRepartitionEnumerator(
            QueryOperatorEnumerator source, int partitionCount, int partitionIndex, 
            Func keySelector, HashRepartitionStream repartitionStream,
            CountdownEvent barrier, ListChunk>[,] valueExchangeMatrix, CancellationToken cancellationToken) 
        { 
            Contract.Assert(source != null);
            Contract.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired)); 
            Contract.Assert(repartitionStream != null);
            Contract.Assert(barrier != null);
            Contract.Assert(valueExchangeMatrix != null);
            Contract.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)"); 
            Contract.Assert(valueExchangeMatrix.GetLength(1) == partitionCount, "expected square matrix of buffers (NxN)");
            Contract.Assert(0 <= partitionIndex && partitionIndex < partitionCount); 
 
            m_source = source;
            m_partitionCount = partitionCount; 
            m_partitionIndex = partitionIndex;
            m_keySelector = keySelector;
            m_repartitionStream = repartitionStream;
            m_barrier = barrier; 
            m_valueExchangeMatrix = valueExchangeMatrix;
            m_cancellationToken = cancellationToken; 
        } 

        //--------------------------------------------------------------------------------------- 
        // Retrieves the next element from this partition.  All repartitioning operators across
        // all partitions cooperate in a barrier-style algorithm.  The first time an element is
        // requested, the repartitioning operator will enter the 1st phase: during this phase, it
        // scans its entire input and compute the destination partition for each element.  During 
        // the 2nd phase, each partition scans the elements found by all other partitions for
        // it, and yield this to callers.  The only synchronization required is the barrier itself 
        // -- all other parts of this algorithm are synchronization-free. 
        //
        // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a 
        // larger time-to-first-element latency, at least compared with our old implementation; this
        // happens because all input elements must be fetched before we can produce a single output
        // element.  In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
        // anyway, so having the repartitioning operator do so isn't complicating matters much at all. 
        //
 
        internal override bool MoveNext(ref Pair currentElement, ref int currentKey) 
        {
            if (m_partitionCount == 1) 
            {
                // If there's only one partition, no need to do any sort of exchanges.
                TIgnoreKey keyUnused = default(TIgnoreKey);
                TInputOutput current = default(TInputOutput); 
#if DEBUG
                currentKey = unchecked((int)0xdeadbeef); 
#endif 
                if (m_source.MoveNext(ref current, ref keyUnused))
                { 
                    currentElement = new Pair(
                        current, m_keySelector == null ? default(THashKey) : m_keySelector(current));
                    return true;
                } 
                return false;
            } 
 
            Mutables mutables = m_mutables;
            if (mutables == null) 
                mutables = m_mutables = new Mutables();

            // If we haven't enumerated the source yet, do that now.  This is the first phase
            // of a two-phase barrier style operation. 
            if (mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED)
            { 
                EnumerateAndRedistributeElements(); 
                Contract.Assert(mutables.m_currentBufferIndex != ENUMERATION_NOT_STARTED);
            } 

            // Once we've enumerated our contents, we can then go back and walk the buffers that belong
            // to the current partition.  This is phase two.  Note that we slyly move on to the first step
            // of phase two before actually waiting for other partitions.  That's because we can enumerate 
            // the buffer we wrote to above, as already noted.
            while (mutables.m_currentBufferIndex < m_partitionCount) 
            { 
                // If the queue is non-null and still has elements, yield them.
                if (mutables.m_currentBuffer != null) 
                {
                    if (++mutables.m_currentIndex < mutables.m_currentBuffer.Count)
                    {
                        // Return the current element. 
                        currentElement = mutables.m_currentBuffer.m_chunk[mutables.m_currentIndex];
                        return true; 
                    } 
                    else
                    { 
                        // If the chunk is empty, advance to the next one (if any).
                        mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
                        mutables.m_currentBuffer = mutables.m_currentBuffer.Next;
                        Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count > 0); 
                        continue; // Go back around and invoke this same logic.
                    } 
                } 

                // We're done with the current partition.  Slightly different logic depending on whether 
                // we're on our own buffer or one that somebody else found for us.
                if (mutables.m_currentBufferIndex == m_partitionIndex)
                {
                    // We now need to wait at the barrier, in case some other threads aren't done. 
                    // Once we wake up, we reset our index and will increment it immediately after.
                    m_barrier.Wait(m_cancellationToken); 
                    mutables.m_currentBufferIndex = ENUMERATION_NOT_STARTED; 
                }
 
                // Advance to the next buffer.
                mutables.m_currentBufferIndex++;
                mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
 
                if (mutables.m_currentBufferIndex == m_partitionIndex)
                { 
                    // Skip our current buffer (since we already enumerated it). 
                    mutables.m_currentBufferIndex++;
                } 

                // Assuming we're within bounds, retrieve the next buffer object.
                if (mutables.m_currentBufferIndex < m_partitionCount)
                { 
                    mutables.m_currentBuffer = m_valueExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
                } 
            } 

            // We're done. No more buffers to enumerate. 
            return false;
        }

        //--------------------------------------------------------------------------------------- 
        // Called when this enumerator is first enumerated; it must walk through the source
        // and redistribute elements to their slot in the exchange matrix. 
        // 

        private void EnumerateAndRedistributeElements() 
        {
            Mutables mutables = m_mutables;
            Contract.Assert(mutables != null);
 
            ListChunk>[] privateBuffers = new ListChunk>[m_partitionCount];
 
            TInputOutput element = default(TInputOutput); 
            TIgnoreKey ignoreKey = default(TIgnoreKey);
            int loopCount = 0; 
            while (m_source.MoveNext(ref element, ref ignoreKey))
            {
                if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
                    CancellationState.ThrowIfCanceled(m_cancellationToken); 

                // Calculate the element's destination partition index, placing it into the 
                // appropriate buffer from which partitions will later enumerate. 
                int destinationIndex;
                THashKey elementHashKey = default(THashKey); 
                if (m_keySelector != null)
                {
                    elementHashKey = m_keySelector(element);
                    destinationIndex = m_repartitionStream.GetHashCode(elementHashKey) % m_partitionCount; 
                }
                else 
                { 
                    Contract.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
                    destinationIndex = m_repartitionStream.GetHashCode(element) % m_partitionCount; 
                }

                Contract.Assert(0 <= destinationIndex && destinationIndex < m_partitionCount,
                                "destination partition outside of the legal range of partitions"); 

                // Get the buffer for the destnation partition, lazily allocating if needed.  We maintain 
                // this list in our own private cache so that we avoid accessing shared memory locations 
                // too much.  In the original implementation, we'd access the buffer in the matrix ([N,M],
                // where N is the current partition and M is the destination), but some rudimentary 
                // performance profiling indicates copying at the end performs better.
                ListChunk> buffer = privateBuffers[destinationIndex];
                if (buffer == null)
                { 
                    const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
                    privateBuffers[destinationIndex] = buffer = new ListChunk>(INITIAL_PRIVATE_BUFFER_SIZE); 
                } 

                buffer.Add(new Pair(element, elementHashKey)); 
            }

            // Copy the local buffers to the shared space and then signal to other threads that
            // we are done.  We can then immediately move on to enumerating the elements we found 
            // for the current partition before waiting at the barrier.  If we found a lot, we will
            // hopefully never have to physically wait. 
            for (int i = 0; i < m_partitionCount; i++) 
            {
                m_valueExchangeMatrix[m_partitionIndex, i] = privateBuffers[i]; 
            }

            m_barrier.Signal();
 
            // Begin at our own buffer.
            mutables.m_currentBufferIndex = m_partitionIndex; 
            mutables.m_currentBuffer = privateBuffers[m_partitionIndex]; 
            mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
        } 

        protected override void Dispose(bool disposed)
        {
            if (m_barrier != null) 
            {
                // Since this enumerator is being disposed, we will decrement the barrier, 
                // in case other enumerators will wait on the barrier. 
                if (m_mutables == null || (m_mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED))
                { 
                    m_barrier.Signal();
                    m_barrier = null;
                }
 
                m_source.Dispose();
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// HashRepartitionEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Threading;
using System.Diagnostics.Contracts; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// This enumerator handles the actual coordination among partitions required to
    /// accomplish the repartitioning operation, as explained above.
    ///  
    /// The kind of elements.
    /// The key used to distribute elements. 
    /// The kind of keys found in the source (ignored). 
    internal class HashRepartitionEnumerator : QueryOperatorEnumerator, int>
 
    {
        private const int ENUMERATION_NOT_STARTED = -1; // Sentinel to note we haven't begun enumerating yet.

        private readonly int m_partitionCount; // The number of partitions. 
        private readonly int m_partitionIndex; // Our unique partition index.
        private readonly Func m_keySelector; // A key-selector function. 
        private readonly HashRepartitionStream m_repartitionStream; // A repartitioning stream. 
        private readonly ListChunk>[,] m_valueExchangeMatrix; // Matrix to do inter-task communication.
        private readonly QueryOperatorEnumerator m_source; // The immediate source of data. 
        private CountdownEvent m_barrier; // Used to signal and wait for repartitions to complete.
        private readonly CancellationToken m_cancellationToken; // A token for canceling the process.
        private Mutables m_mutables; // Mutable fields for this enumerator.
 
        class Mutables
        { 
            internal int m_currentBufferIndex; // Current buffer index. 
            internal ListChunk> m_currentBuffer; // The buffer we're currently enumerating.
            internal int m_currentIndex; // Current index into the buffer. 

            internal Mutables()
            {
                m_currentBufferIndex = ENUMERATION_NOT_STARTED; 
            }
        } 
 
        //----------------------------------------------------------------------------------------
        // Creates a new repartitioning enumerator. 
        //
        // Arguments:
        //     source            - the data stream from which to pull elements
        //     useOrdinalOrderPreservation - whether order preservation is required 
        //     partitionCount    - total number of partitions
        //     partitionIndex    - this operator's unique partition index 
        //     repartitionStream - the stream object to use for partition selection 
        //     barrier           - a latch used to signal task completion
        //     buffers           - a set of buffers for inter-task communication 
        //

        internal HashRepartitionEnumerator(
            QueryOperatorEnumerator source, int partitionCount, int partitionIndex, 
            Func keySelector, HashRepartitionStream repartitionStream,
            CountdownEvent barrier, ListChunk>[,] valueExchangeMatrix, CancellationToken cancellationToken) 
        { 
            Contract.Assert(source != null);
            Contract.Assert(keySelector != null || typeof(THashKey) == typeof(NoKeyMemoizationRequired)); 
            Contract.Assert(repartitionStream != null);
            Contract.Assert(barrier != null);
            Contract.Assert(valueExchangeMatrix != null);
            Contract.Assert(valueExchangeMatrix.GetLength(0) == partitionCount, "expected square matrix of buffers (NxN)"); 
            Contract.Assert(valueExchangeMatrix.GetLength(1) == partitionCount, "expected square matrix of buffers (NxN)");
            Contract.Assert(0 <= partitionIndex && partitionIndex < partitionCount); 
 
            m_source = source;
            m_partitionCount = partitionCount; 
            m_partitionIndex = partitionIndex;
            m_keySelector = keySelector;
            m_repartitionStream = repartitionStream;
            m_barrier = barrier; 
            m_valueExchangeMatrix = valueExchangeMatrix;
            m_cancellationToken = cancellationToken; 
        } 

        //--------------------------------------------------------------------------------------- 
        // Retrieves the next element from this partition.  All repartitioning operators across
        // all partitions cooperate in a barrier-style algorithm.  The first time an element is
        // requested, the repartitioning operator will enter the 1st phase: during this phase, it
        // scans its entire input and compute the destination partition for each element.  During 
        // the 2nd phase, each partition scans the elements found by all other partitions for
        // it, and yield this to callers.  The only synchronization required is the barrier itself 
        // -- all other parts of this algorithm are synchronization-free. 
        //
        // Notes: One rather large penalty that this algorithm incurs is higher memory usage and a 
        // larger time-to-first-element latency, at least compared with our old implementation; this
        // happens because all input elements must be fetched before we can produce a single output
        // element.  In many cases this isn't too terrible: e.g. a GroupBy requires this to occur
        // anyway, so having the repartitioning operator do so isn't complicating matters much at all. 
        //
 
        internal override bool MoveNext(ref Pair currentElement, ref int currentKey) 
        {
            if (m_partitionCount == 1) 
            {
                // If there's only one partition, no need to do any sort of exchanges.
                TIgnoreKey keyUnused = default(TIgnoreKey);
                TInputOutput current = default(TInputOutput); 
#if DEBUG
                currentKey = unchecked((int)0xdeadbeef); 
#endif 
                if (m_source.MoveNext(ref current, ref keyUnused))
                { 
                    currentElement = new Pair(
                        current, m_keySelector == null ? default(THashKey) : m_keySelector(current));
                    return true;
                } 
                return false;
            } 
 
            Mutables mutables = m_mutables;
            if (mutables == null) 
                mutables = m_mutables = new Mutables();

            // If we haven't enumerated the source yet, do that now.  This is the first phase
            // of a two-phase barrier style operation. 
            if (mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED)
            { 
                EnumerateAndRedistributeElements(); 
                Contract.Assert(mutables.m_currentBufferIndex != ENUMERATION_NOT_STARTED);
            } 

            // Once we've enumerated our contents, we can then go back and walk the buffers that belong
            // to the current partition.  This is phase two.  Note that we slyly move on to the first step
            // of phase two before actually waiting for other partitions.  That's because we can enumerate 
            // the buffer we wrote to above, as already noted.
            while (mutables.m_currentBufferIndex < m_partitionCount) 
            { 
                // If the queue is non-null and still has elements, yield them.
                if (mutables.m_currentBuffer != null) 
                {
                    if (++mutables.m_currentIndex < mutables.m_currentBuffer.Count)
                    {
                        // Return the current element. 
                        currentElement = mutables.m_currentBuffer.m_chunk[mutables.m_currentIndex];
                        return true; 
                    } 
                    else
                    { 
                        // If the chunk is empty, advance to the next one (if any).
                        mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
                        mutables.m_currentBuffer = mutables.m_currentBuffer.Next;
                        Contract.Assert(mutables.m_currentBuffer == null || mutables.m_currentBuffer.Count > 0); 
                        continue; // Go back around and invoke this same logic.
                    } 
                } 

                // We're done with the current partition.  Slightly different logic depending on whether 
                // we're on our own buffer or one that somebody else found for us.
                if (mutables.m_currentBufferIndex == m_partitionIndex)
                {
                    // We now need to wait at the barrier, in case some other threads aren't done. 
                    // Once we wake up, we reset our index and will increment it immediately after.
                    m_barrier.Wait(m_cancellationToken); 
                    mutables.m_currentBufferIndex = ENUMERATION_NOT_STARTED; 
                }
 
                // Advance to the next buffer.
                mutables.m_currentBufferIndex++;
                mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
 
                if (mutables.m_currentBufferIndex == m_partitionIndex)
                { 
                    // Skip our current buffer (since we already enumerated it). 
                    mutables.m_currentBufferIndex++;
                } 

                // Assuming we're within bounds, retrieve the next buffer object.
                if (mutables.m_currentBufferIndex < m_partitionCount)
                { 
                    mutables.m_currentBuffer = m_valueExchangeMatrix[mutables.m_currentBufferIndex, m_partitionIndex];
                } 
            } 

            // We're done. No more buffers to enumerate. 
            return false;
        }

        //--------------------------------------------------------------------------------------- 
        // Called when this enumerator is first enumerated; it must walk through the source
        // and redistribute elements to their slot in the exchange matrix. 
        // 

        private void EnumerateAndRedistributeElements() 
        {
            Mutables mutables = m_mutables;
            Contract.Assert(mutables != null);
 
            ListChunk>[] privateBuffers = new ListChunk>[m_partitionCount];
 
            TInputOutput element = default(TInputOutput); 
            TIgnoreKey ignoreKey = default(TIgnoreKey);
            int loopCount = 0; 
            while (m_source.MoveNext(ref element, ref ignoreKey))
            {
                if ((loopCount++ & CancellationState.POLL_INTERVAL) == 0)
                    CancellationState.ThrowIfCanceled(m_cancellationToken); 

                // Calculate the element's destination partition index, placing it into the 
                // appropriate buffer from which partitions will later enumerate. 
                int destinationIndex;
                THashKey elementHashKey = default(THashKey); 
                if (m_keySelector != null)
                {
                    elementHashKey = m_keySelector(element);
                    destinationIndex = m_repartitionStream.GetHashCode(elementHashKey) % m_partitionCount; 
                }
                else 
                { 
                    Contract.Assert(typeof(THashKey) == typeof(NoKeyMemoizationRequired));
                    destinationIndex = m_repartitionStream.GetHashCode(element) % m_partitionCount; 
                }

                Contract.Assert(0 <= destinationIndex && destinationIndex < m_partitionCount,
                                "destination partition outside of the legal range of partitions"); 

                // Get the buffer for the destnation partition, lazily allocating if needed.  We maintain 
                // this list in our own private cache so that we avoid accessing shared memory locations 
                // too much.  In the original implementation, we'd access the buffer in the matrix ([N,M],
                // where N is the current partition and M is the destination), but some rudimentary 
                // performance profiling indicates copying at the end performs better.
                ListChunk> buffer = privateBuffers[destinationIndex];
                if (buffer == null)
                { 
                    const int INITIAL_PRIVATE_BUFFER_SIZE = 128;
                    privateBuffers[destinationIndex] = buffer = new ListChunk>(INITIAL_PRIVATE_BUFFER_SIZE); 
                } 

                buffer.Add(new Pair(element, elementHashKey)); 
            }

            // Copy the local buffers to the shared space and then signal to other threads that
            // we are done.  We can then immediately move on to enumerating the elements we found 
            // for the current partition before waiting at the barrier.  If we found a lot, we will
            // hopefully never have to physically wait. 
            for (int i = 0; i < m_partitionCount; i++) 
            {
                m_valueExchangeMatrix[m_partitionIndex, i] = privateBuffers[i]; 
            }

            m_barrier.Signal();
 
            // Begin at our own buffer.
            mutables.m_currentBufferIndex = m_partitionIndex; 
            mutables.m_currentBuffer = privateBuffers[m_partitionIndex]; 
            mutables.m_currentIndex = ENUMERATION_NOT_STARTED;
        } 

        protected override void Dispose(bool disposed)
        {
            if (m_barrier != null) 
            {
                // Since this enumerator is being disposed, we will decrement the barrier, 
                // in case other enumerators will wait on the barrier. 
                if (m_mutables == null || (m_mutables.m_currentBufferIndex == ENUMERATION_NOT_STARTED))
                { 
                    m_barrier.Signal();
                    m_barrier = null;
                }
 
                m_source.Dispose();
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK