AsynchronousChannelMergeEnumerator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / AsynchronousChannelMergeEnumerator.cs / 1305376 / AsynchronousChannelMergeEnumerator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// AsynchronousChannelMergeEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Threading; 
using System.Diagnostics.Contracts;
 
namespace System.Linq.Parallel 
{
    ///  
    /// An enumerator that merges multiple one-to-one channels into a single output
    /// stream, including any necessary blocking and synchronization. This is an
    /// asynchronous enumerator, i.e. the producers may be inserting items into the
    /// channels concurrently with the consumer taking items out of them. Therefore, 
    /// enumerating this object can cause the current thread to block.
    /// 
    /// We use a biased choice algorithm to choose from our consumer channels. I.e. we 
    /// will prefer to process elements in a fair round-robin fashion, but will
    /// occ----ionally bypass this if a channel is empty. 
    ///
    /// 
    /// 
    internal sealed class AsynchronousChannelMergeEnumerator : MergeEnumerator 
    {
        private AsynchronousChannel[] m_channels; // The channels being enumerated. 
        private ManualResetEventSlim[] m_channelEvents; // Events for the channels being enumerated. 
        private bool[] m_done;       // Tracks which channels are done.
        private int m_channelIndex;  // The next channel from which we'll dequeue. 
        private T m_currentElement;  // The remembered element from the previous MoveNext.

        //------------------------------------------------------------------------------------
        // Allocates a new enumerator over a set of one-to-one channels. 
        //
 
        internal AsynchronousChannelMergeEnumerator( 
            QueryTaskGroupState taskGroupState, AsynchronousChannel[] channels) : base(taskGroupState)
        { 
            Contract.Assert(channels != null);
#if DEBUG
            foreach (AsynchronousChannel c in channels) Contract.Assert(c != null);
#endif 

            m_channels = channels; 
            m_channelIndex = -1; // To catch calls to Current before MoveNext. 
            m_done = new bool[m_channels.Length]; // Initialized to { false }, i.e. no channels done.
        } 

        //-----------------------------------------------------------------------------------
        // Retrieves the current element.
        // 
        // Notes:
        //     This throws if we haven't begun enumerating or have gone past the end of the 
        //     data source. 
        //
 
        public override T Current
        {
            get
            { 
                if (m_channelIndex == -1 || m_channelIndex == m_channels.Length)
                { 
                    throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); 
                }
 
                return m_currentElement;
            }
        }
 
        //-----------------------------------------------------------------------------------
        // Positions the enumerator over the next element. This includes merging as we 
        // enumerate, which may also involve waiting for producers to generate new items. 
        //
        // Return Value: 
        //     True if there's a current element, false if we've reached the end.
        //

        public override bool MoveNext() 
        {
            // On the first call to MoveNext, we advance the position to a real channel. 
            int index = m_channelIndex; 
            if (index == -1)
            { 
                m_channelIndex = index = 0;
            }

            // If we're past the end, enumeration is done. 
            if (index == m_channels.Length)
            { 
                return false; 
            }
 
            // Else try the fast path.
            if (!m_done[index] && m_channels[index].TryDequeue(ref m_currentElement))
            {
                m_channelIndex = (index + 1) % m_channels.Length; 
                return true;
            } 
 
            return MoveNextSlowPath();
        } 

        //-----------------------------------------------------------------------------------
        // The slow path used when a quick loop through the channels didn't come up
        // with anything. We may need to block and/or mark channels as done. 
        //
 
        private bool MoveNextSlowPath() 
        {
            int doneChannels = 0; 

            // Remember the first channel we are looking at. If we pass through all of the
            // channels without finding an element, we will go to sleep.
            int firstChannelIndex = m_channelIndex; 

            int currChannelIndex; 
            while ((currChannelIndex = m_channelIndex) != m_channels.Length) 
            {
                AsynchronousChannel current = m_channels[currChannelIndex]; 

                bool isDone = m_done[currChannelIndex];
                if (!isDone && current.TryDequeue(ref m_currentElement))
                { 
                    // The channel has an item to be processed. We already remembered the current
                    // element (Dequeue stores it as an out-parameter), so we just return true 
                    // after advancing to the next channel. 
                    m_channelIndex = (currChannelIndex + 1) % m_channels.Length;
                    return true; 
                }
                else
                {
                    // There isn't an element in the current channel. Check whether the channel 
                    // is done before possibly waiting for an element to arrive.
                    if (!isDone && current.IsDone) 
                    { 
                        // We must check to ensure an item didn't get enqueued after originally
                        // trying to dequeue above and reading the IsDone flag. If there are still 
                        // elements, the producer may have marked the channel as done but of course
                        // we still need to continue processing them.
                        if (!current.IsChunkBufferEmpty)
                        { 
                            bool dequeueResult = current.TryDequeue(ref m_currentElement);
                            Contract.Assert(dequeueResult, "channel isn't empty, yet the dequeue failed, hmm"); 
                            return true; 
                        }
 
                        // Mark this channel as being truly done. We won't consider it any longer.
                        m_done[currChannelIndex] = true;
                        if (m_channelEvents != null)
                        { 
                            m_channelEvents[currChannelIndex] = null; //we definitely never want to wait on this (soon to be disposed) event.
                        } 
 
                        isDone = true;
                        current.Dispose(); 
                    }

                    if (isDone)
                    { 
                        Contract.Assert(m_channels[currChannelIndex].IsDone, "thought this channel was done");
                        Contract.Assert(m_channels[currChannelIndex].IsChunkBufferEmpty, "thought this channel was empty"); 
 
                        // Increment the count of done channels that we've seen. If this reaches the
                        // total number of channels, we know we're finally done. 
                        if (++doneChannels == m_channels.Length)
                        {
                            // Remember that we are done by setting the index past the end.
                            m_channelIndex = currChannelIndex = m_channels.Length; 
                            break;
                        } 
                    } 

                    // Still no element. Advance to the next channel and continue searching. 
                    m_channelIndex = currChannelIndex = (currChannelIndex + 1) % m_channels.Length;

                    // If the channels aren't done, and we've inspected all of the queues and still
                    // haven't found anything, we will go ahead and wait on all the queues. 
                    if (currChannelIndex == firstChannelIndex)
                    { 
                        // On our first pass through the queues, we didn't have any side-effects 
                        // that would let a producer know we are waiting. Now we go through and
                        // accumulate a set of events to wait on. 
                        try
                        {
                            // If this is the first time we must block, lazily allocate and cache
                            // a list of events to be reused for next time. 
                            if (m_channelEvents == null)
                                m_channelEvents = new ManualResetEventSlim[m_channels.Length]; 
 
                            // Reset our done channels counter; we need to tally them again during the
                            // second pass through. 
                            doneChannels = 0;

                            for (int i = 0; i < m_channels.Length; i++)
                            { 
                                if (!m_done[i] && m_channels[i].TryDequeue(ref m_currentElement, ref m_channelEvents[i]))
                                { 
                                    Contract.Assert(m_channelEvents[i] == null); 

                                    // The channel has received an item since the last time we checked. 
                                    // Just return and let the consumer process the element returned.
                                    return true;
                                }
                                else if (m_channelEvents[i] == null) 
                                {
                                    // The channel in question is done and empty. 
                                    Contract.Assert(m_channels[i].IsDone, "expected channel to be done"); 
                                    Contract.Assert(m_channels[i].IsChunkBufferEmpty, "expected channel to be empty");
 
                                    if (!m_done[i])
                                    {
                                        m_done[i] = true;
                                        m_channels[i].Dispose(); 
                                    }
 
                                    if (++doneChannels == m_channels.Length) 
                                    {
                                        // No need to wait. All channels are done. Remember this by setting 
                                        // the index past the end of the channel list.
                                        m_channelIndex = currChannelIndex = m_channels.Length;
                                        break;
                                    } 
                                }
                            } 
 
                            // If all channels are done, we can break out of the loop entirely.
                            if (currChannelIndex == m_channels.Length) 
                            {
                                break;
                            }
 
                            // Finally, we have accumulated a set of events. Perform a wait-any on them.
                            Contract.Assert(m_channelEvents.Length <= 63, 
                                            "WaitForMultipleObjects only supports 63 threads if running on an STA thread (64 otherwise)."); 

                            //This WaitAny() does not require cancellation support as it will wake up when all the producers into the 
                            //channel have finished.  Hence, if all the producers wake up on cancellation, so will this.
                            m_channelIndex = currChannelIndex = WaitAny(m_channelEvents);
                            Contract.Assert(0 <= m_channelIndex && m_channelIndex < m_channelEvents.Length);
                            Contract.Assert(0 <= currChannelIndex && currChannelIndex < m_channelEvents.Length); 
                            Contract.Assert(m_channelEvents[currChannelIndex] != null);
 
                            // 
                            // We have woken up, and the channel that caused this is contained in the
                            // returned index. This could be due to one of two reasons. Either the channel's 
                            // producer has notified that it is done, in which case we just have to take it
                            // out of our current wait-list and redo the wait, or a channel actually has an
                            // item which we will go ahead and process.
                            // 
                            // We just go back 'round the loop to accomplish this logic. Reset the channel
                            // index and # of done channels. Go back to the beginning, starting with the channel 
                            // that caused us to wake up. 
                            //
 
                            firstChannelIndex = currChannelIndex;
                            doneChannels = 0;
                        }
                        finally 
                        {
                            // We have to guarantee that any waits we said we would perform are undone. 
                            for (int i = 0; i < m_channelEvents.Length; i++) 
                            {
                                // If we retrieved an event from a channel, we need to reset the wait. 
                                if (m_channelEvents[i] != null)
                                {
                                    m_channels[i].DoneWithDequeueWait();
                                } 
                            }
                        } 
                    } 
                }
            } 

            TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks);

            // If we got this far, it means we've exhausted our channels. 
            Contract.Assert(currChannelIndex == m_channels.Length);
 
            // If any tasks failed, propagate the failure now. We must do it here, because the merge 
            // executor returns control back to the caller before the query has completed; contrast
            // this with synchronous enumeration where we can wait before returning. 
            m_taskGroupState.QueryEnd(false);

            return false;
        } 

        ///  
        /// WaitAny simulates a Win32-style WaitAny on the set of thin-events. 
        /// 
        /// An array of thin-events (null elements permitted) 
        /// The index of the specific event in events that caused us to wake up.
        private static int WaitAny(ManualResetEventSlim[] events)
        {
            Contract.Assert(events != null); 

            // Small spin wait loop. 
            const int WAIT_ANY_SPINS = 20; 
            SpinWait spinner = new SpinWait();
 
            for (int i = 0; i < WAIT_ANY_SPINS; i++)
            {
                for (int j = 0; j < events.Length; j++)
                { 
                    if (events[j] != null && events[j].IsSet)
                        return j; 
                } 

                spinner.SpinOnce(); 
            }

            // Next, count up the null events.
            int nullEvents = 0; 
            for (int i = 0; i < events.Length; i++)
            { 
                if (events[i] == null) 
                    nullEvents++;
            } 

            // Lastly, accumulate the events in preparation for a true wait.
            WaitHandle[] waitHandles = new WaitHandle[events.Length - nullEvents];
            Contract.Assert(waitHandles.Length > 0); 

            for (int i = 0, j = 0; i < events.Length; i++) 
            { 
                if (events[i] == null)
                { 
                    continue;
                }

                waitHandles[j] = events[i].WaitHandle; 
                j++;
            } 
 
            // And finally, issue the real wait.
            int index = WaitHandle.WaitAny(waitHandles); 

            // Translate this back into the events array index. The 'waitHandles' array
            // will effectively have the non-null elements "slid down" into the positions
            // from 'events' that contain nulls. We count the number of null handles before 
            // the index and add that to get our real position.
            for (int i = 0, j = -1; i < events.Length; i++) 
            { 
                // If the current event is non-null, increment our translation index.
                if (events[i] != null) 
                {
                    j++;

                    // If we found the element, adjust our index and break. 
                    if (j == index)
                    { 
                        index = i; 
                        break;
                    } 
                }

                Contract.Assert(i != events.Length - 1, "didn't find a non-null event");
            } 
            Contract.Assert(events[index] != null, "expected non-null event");
 
            return index; 
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK