InlinedAggregationOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Inlined / InlinedAggregationOperator.cs / 1305376 / InlinedAggregationOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// InlinedAggregationOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// This class is common to all of the "inlined" versions of various aggregations.  The
    /// inlined operators ensure that real MSIL instructions are used to perform elementary
    /// operations versus general purpose delegate-based binary operators.  For obvious reasons 
    /// this is a quite bit more efficient, although it does lead to a fair bit of unfortunate
    /// code duplication. 
    ///  
    /// 
    ///  
    /// 
    internal abstract class InlinedAggregationOperator :
        UnaryQueryOperator
    { 

        //---------------------------------------------------------------------------------------- 
        // Constructs a new instance of an inlined sum associative operator. 
        //
 
        internal InlinedAggregationOperator(IEnumerable child)
            :base(child)
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
        }
 
        //--------------------------------------------------------------------------------------- 
        // Executes the entire query tree, and aggregates the intermediate results into the
        // final result based on the binary operators and final reduction. 
        //
        // Return Value:
        //     The single result of aggregation.
        // 

        internal TResult Aggregate() 
        { 
            TResult tr;
            Exception toThrow = null; 

            try
            {
                tr = InternalAggregate(ref toThrow); 
            }
            catch (ThreadAbortException) 
            { 
                // Do not wrap ThreadAbortExceptions
                throw; 
            }
            catch (Exception ex)
            {
                // If the exception is not an aggregate, we must wrap it up and throw that instead. 
                if (!(ex is AggregateException))
                { 
                    // 
                    // Special case: if the query has been canceled, we do not want to wrap the
                    // OperationCanceledException with an AggregateException. 
                    //
                    // The query has been canceled iff these conditions hold:
                    // -  The exception thrown is OperationCanceledException
                    // -  We find the external CancellationToken for this query in the OperationCanceledException 
                    // -  The externalToken is actually in the canceled state.
 
                    OperationCanceledException cancelEx = ex as OperationCanceledException; 
                    if (cancelEx != null
                        && cancelEx.CancellationToken == SpecifiedQuerySettings.CancellationState.ExternalCancellationToken 
                        && SpecifiedQuerySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested)
                    {
                        throw;
                    } 

                    throw new AggregateException(ex); 
                } 

                // Else, just rethrow the current active exception. 
                throw;
            }

            // If the aggregation requested that we throw a singular exception, throw it. 
            if (toThrow != null)
            { 
                throw toThrow; 
            }
 
            return tr;
        }

        //--------------------------------------------------------------------------------------- 
        // Performs the operator-specific aggregation.
        // 
        // Arguments: 
        //     singularExceptionToThrow - if the aggregate exception should throw an exception
        //                                without aggregating, this ref-param should be set 
        //
        // Return Value:
        //     The single result of aggregation.
        // 

        protected abstract TResult InternalAggregate(ref Exception singularExceptionToThrow); 
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with 
        // partitions as needed.
        //

        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping)
        { 
            QueryResults childQueryResults = Child.Open(settings, preferStriping); 
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        } 

        internal override void WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient,
            bool preferStriping, QuerySettings settings) 
        {
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream( 
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);
 
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = CreateEnumerator(i, partitionCount, inputStream[i], null, settings.CancellationState.MergedCancellationToken);
            } 

            recipient.Receive(outputStream); 
        } 

        protected abstract QueryOperatorEnumerator CreateEnumerator( 
            int index, int count, QueryOperatorEnumerator source, object sharedData, CancellationToken cancellationToken);

        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized.");
            throw new NotSupportedException(); 
        } 

 
        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        //
 
        internal override bool LimitsParallelism
        { 
            get { return false; } 
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// InlinedAggregationOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// This class is common to all of the "inlined" versions of various aggregations.  The
    /// inlined operators ensure that real MSIL instructions are used to perform elementary
    /// operations versus general purpose delegate-based binary operators.  For obvious reasons 
    /// this is a quite bit more efficient, although it does lead to a fair bit of unfortunate
    /// code duplication. 
    ///  
    /// 
    ///  
    /// 
    internal abstract class InlinedAggregationOperator :
        UnaryQueryOperator
    { 

        //---------------------------------------------------------------------------------------- 
        // Constructs a new instance of an inlined sum associative operator. 
        //
 
        internal InlinedAggregationOperator(IEnumerable child)
            :base(child)
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
        }
 
        //--------------------------------------------------------------------------------------- 
        // Executes the entire query tree, and aggregates the intermediate results into the
        // final result based on the binary operators and final reduction. 
        //
        // Return Value:
        //     The single result of aggregation.
        // 

        internal TResult Aggregate() 
        { 
            TResult tr;
            Exception toThrow = null; 

            try
            {
                tr = InternalAggregate(ref toThrow); 
            }
            catch (ThreadAbortException) 
            { 
                // Do not wrap ThreadAbortExceptions
                throw; 
            }
            catch (Exception ex)
            {
                // If the exception is not an aggregate, we must wrap it up and throw that instead. 
                if (!(ex is AggregateException))
                { 
                    // 
                    // Special case: if the query has been canceled, we do not want to wrap the
                    // OperationCanceledException with an AggregateException. 
                    //
                    // The query has been canceled iff these conditions hold:
                    // -  The exception thrown is OperationCanceledException
                    // -  We find the external CancellationToken for this query in the OperationCanceledException 
                    // -  The externalToken is actually in the canceled state.
 
                    OperationCanceledException cancelEx = ex as OperationCanceledException; 
                    if (cancelEx != null
                        && cancelEx.CancellationToken == SpecifiedQuerySettings.CancellationState.ExternalCancellationToken 
                        && SpecifiedQuerySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested)
                    {
                        throw;
                    } 

                    throw new AggregateException(ex); 
                } 

                // Else, just rethrow the current active exception. 
                throw;
            }

            // If the aggregation requested that we throw a singular exception, throw it. 
            if (toThrow != null)
            { 
                throw toThrow; 
            }
 
            return tr;
        }

        //--------------------------------------------------------------------------------------- 
        // Performs the operator-specific aggregation.
        // 
        // Arguments: 
        //     singularExceptionToThrow - if the aggregate exception should throw an exception
        //                                without aggregating, this ref-param should be set 
        //
        // Return Value:
        //     The single result of aggregation.
        // 

        protected abstract TResult InternalAggregate(ref Exception singularExceptionToThrow); 
 
        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with 
        // partitions as needed.
        //

        internal override QueryResults Open( 
            QuerySettings settings, bool preferStriping)
        { 
            QueryResults childQueryResults = Child.Open(settings, preferStriping); 
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        } 

        internal override void WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient,
            bool preferStriping, QuerySettings settings) 
        {
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream( 
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);
 
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = CreateEnumerator(i, partitionCount, inputStream[i], null, settings.CancellationState.MergedCancellationToken);
            } 

            recipient.Receive(outputStream); 
        } 

        protected abstract QueryOperatorEnumerator CreateEnumerator( 
            int index, int count, QueryOperatorEnumerator source, object sharedData, CancellationToken cancellationToken);

        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized.");
            throw new NotSupportedException(); 
        } 

 
        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        //
 
        internal override bool LimitsParallelism
        { 
            get { return false; } 
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK