SynchronousChannelMergeEnumerator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / SynchronousChannelMergeEnumerator.cs / 1305376 / SynchronousChannelMergeEnumerator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SynchronousChannelMergeEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{ 
    /// 
    /// This enumerator merges multiple input channels into a single output stream. The merging process just 
    /// goes from left-to-right, enumerating each channel in succession in its entirety.
    /// Assumptions:
    ///     Before enumerating this object, all producers for all channels must have finished enqueueing new
    ///     elements. 
    /// 
    ///  
    internal sealed class SynchronousChannelMergeEnumerator : MergeEnumerator 
    {
        private SynchronousChannel[] m_channels; // The channel array we will enumerate, from left-to-right. 
        private int m_channelIndex; // The current channel index. This moves through the array as we enumerate.
        private T m_currentElement; // The last element remembered during enumeration.

        //------------------------------------------------------------------------------------ 
        // Instantiates a new enumerator for a set of channels.
        // 
 
        internal SynchronousChannelMergeEnumerator(
            QueryTaskGroupState taskGroupState, SynchronousChannel[] channels) : base(taskGroupState) 
        {
            Contract.Assert(channels != null);
#if DEBUG
            foreach (SynchronousChannel c in channels) Contract.Assert(c != null); 
#endif
 
            m_channels = channels; 
            m_channelIndex = -1;
        } 

        //-----------------------------------------------------------------------------------
        // Retrieves the current element.
        // 
        // Notes:
        //     This throws if we haven't begun enumerating or have gone past the end of the 
        //     data source. 
        //
 
        public override T Current
        {
            get
            { 
                // If we're at the beginning or the end of the array, it's invalid to be
                // retrieving the current element. We throw. 
                if (m_channelIndex == -1 || m_channelIndex == m_channels.Length) 
                {
                    throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); 
                }

                return m_currentElement;
            } 
        }
 
        //----------------------------------------------------------------------------------- 
        // Positions the enumerator over the next element. This includes merging as we
        // enumerate, by just incrementing indexes, etc. 
        //
        // Return Value:
        //     True if there's a current element, false if we've reached the end.
        // 

        public override bool MoveNext() 
        { 
            Contract.Assert(m_channels != null);
 
            // If we're at the start, initialize the index.
            if (m_channelIndex == -1)
            {
                m_channelIndex = 0; 
            }
 
            // If the index has reached the end, we bail. 
            while (m_channelIndex != m_channels.Length)
            { 
                SynchronousChannel current = m_channels[m_channelIndex];
                Contract.Assert(current != null);

                if (current.Count == 0) 
                {
                    // We're done with this channel, move on to the next one. We don't 
                    // have to check that it's "done" since this is a synchronous consumer. 
                    m_channelIndex++;
                } 
                else
                {
                    // Remember the "current" element and return.
                    m_currentElement = current.Dequeue(); 
                    return true;
                } 
            } 

            TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks); 

            // If we got this far, it means we've exhausted our channels.
            Contract.Assert(m_channelIndex == m_channels.Length);
 
            return false;
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SynchronousChannelMergeEnumerator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Diagnostics.Contracts; 

namespace System.Linq.Parallel 
{ 
    /// 
    /// This enumerator merges multiple input channels into a single output stream. The merging process just 
    /// goes from left-to-right, enumerating each channel in succession in its entirety.
    /// Assumptions:
    ///     Before enumerating this object, all producers for all channels must have finished enqueueing new
    ///     elements. 
    /// 
    ///  
    internal sealed class SynchronousChannelMergeEnumerator : MergeEnumerator 
    {
        private SynchronousChannel[] m_channels; // The channel array we will enumerate, from left-to-right. 
        private int m_channelIndex; // The current channel index. This moves through the array as we enumerate.
        private T m_currentElement; // The last element remembered during enumeration.

        //------------------------------------------------------------------------------------ 
        // Instantiates a new enumerator for a set of channels.
        // 
 
        internal SynchronousChannelMergeEnumerator(
            QueryTaskGroupState taskGroupState, SynchronousChannel[] channels) : base(taskGroupState) 
        {
            Contract.Assert(channels != null);
#if DEBUG
            foreach (SynchronousChannel c in channels) Contract.Assert(c != null); 
#endif
 
            m_channels = channels; 
            m_channelIndex = -1;
        } 

        //-----------------------------------------------------------------------------------
        // Retrieves the current element.
        // 
        // Notes:
        //     This throws if we haven't begun enumerating or have gone past the end of the 
        //     data source. 
        //
 
        public override T Current
        {
            get
            { 
                // If we're at the beginning or the end of the array, it's invalid to be
                // retrieving the current element. We throw. 
                if (m_channelIndex == -1 || m_channelIndex == m_channels.Length) 
                {
                    throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); 
                }

                return m_currentElement;
            } 
        }
 
        //----------------------------------------------------------------------------------- 
        // Positions the enumerator over the next element. This includes merging as we
        // enumerate, by just incrementing indexes, etc. 
        //
        // Return Value:
        //     True if there's a current element, false if we've reached the end.
        // 

        public override bool MoveNext() 
        { 
            Contract.Assert(m_channels != null);
 
            // If we're at the start, initialize the index.
            if (m_channelIndex == -1)
            {
                m_channelIndex = 0; 
            }
 
            // If the index has reached the end, we bail. 
            while (m_channelIndex != m_channels.Length)
            { 
                SynchronousChannel current = m_channels[m_channelIndex];
                Contract.Assert(current != null);

                if (current.Count == 0) 
                {
                    // We're done with this channel, move on to the next one. We don't 
                    // have to check that it's "done" since this is a synchronous consumer. 
                    m_channelIndex++;
                } 
                else
                {
                    // Remember the "current" element and return.
                    m_currentElement = current.Dequeue(); 
                    return true;
                } 
            } 

            TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks); 

            // If we got this far, it means we've exhausted our channels.
            Contract.Assert(m_channelIndex == m_channels.Length);
 
            return false;
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK