AssociativeAggregationOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / AssociativeAggregationOperator.cs / 1305376 / AssociativeAggregationOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// AssociativeAggregationOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// The aggregation operator is a little unique, in that the enumerators it returns
    /// yield intermediate results instead of the final results. That's because there is
    /// one last Aggregate operation that must occur in order to perform the final reduction 
    /// over the intermediate streams. In other words, the intermediate enumerators produced
    /// by this operator are never seen by other query operators or consumers directly. 
    /// 
    /// An aggregation performs parallel prefixing internally. Given a binary operator O,
    /// it will generate intermediate results by folding O across partitions; then it 
    /// performs a final reduction by folding O accross the intermediate results. The
    /// analysis engine knows about associativity and commutativity, and will ensure the
    /// style of partitioning inserted into the tree is compatable with the operator.
    /// 
    /// For instance, say O is + (meaning it is AC), our input is {1,2,...,8}, and we
    /// use 4 partitions to calculate the aggregation. Sequentially this would look 
    /// like this O(O(O(1,2),...),8), in other words ((1+2)+...)+8. The parallel prefix 
    /// of this (w/ 4 partitions) instead calculates the intermediate aggregations, i.e.:
    /// t1 = O(1,2), t2 = O(3,4), ... t4 = O(7,8), aka t1 = 1+2, t2 = 3+4, t4 = 7+8. 
    /// The final step is to aggregate O over these intermediaries, i.e.
    /// O(O(O(t1,t2),t3),t4), or ((t1+t2)+t3)+t4. This generalizes to any binary operator.
    ///
    /// Beause some aggregations use a different input, intermediate, and output types, 
    /// we support an even more generalized aggregation type. In this model, we have
    /// three operators, an intermediate (used for the incremental aggregations), a 
    /// final (used for the final summary of intermediate results), and a result selector 
    /// (used to perform whatever transformation is needed on the final summary).
    ///  
    /// 
    /// 
    /// 
    internal sealed class AssociativeAggregationOperator : UnaryQueryOperator 
    {
 
        private readonly TIntermediate m_seed; // A seed used during aggregation. 
        private readonly bool m_seedIsSpecified; // Whether a seed was specified. If not, the first element will be used.
        private readonly bool m_throwIfEmpty; // Whether to throw an exception if the data source is empty. 

        // An intermediate reduction function.
        private Func m_intermediateReduce;
 
        // A final reduction function.
        private Func m_finalReduce; 
 
        // The result selector function.
        private Func m_resultSelector; 

        // A function that constructs seed instances
        private Func m_seedFactory;
 
        //----------------------------------------------------------------------------------------
        // Constructs a new instance of an associative operator. 
        // 
        // Assumptions:
        //     This operator must be associative. 
        //

        internal AssociativeAggregationOperator(IEnumerable child, TIntermediate seed, Func seedFactory, bool seedIsSpecified,
                                                Func intermediateReduce, 
                                                Func finalReduce,
                                                Func resultSelector, bool throwIfEmpty, QueryAggregationOptions options) 
            :base(child) 
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
            Contract.Assert(intermediateReduce != null, "need an intermediate reduce function");
            Contract.Assert(finalReduce != null, "need a final reduce function");
            Contract.Assert(resultSelector != null, "need a result selector function");
            Contract.Assert(Enum.IsDefined(typeof(QueryAggregationOptions), options), "enum out of valid range"); 
            Contract.Assert((options & QueryAggregationOptions.Associative) == QueryAggregationOptions.Associative, "expected an associative operator");
            Contract.Assert(typeof(TIntermediate) == typeof(TInput) || seedIsSpecified, "seed must be specified if TIntermediate differs from TInput"); 
 
            m_seed = seed;
            m_seedFactory = seedFactory; 
            m_seedIsSpecified = seedIsSpecified;
            m_intermediateReduce = intermediateReduce;
            m_finalReduce = finalReduce;
            m_resultSelector = resultSelector; 
            m_throwIfEmpty = throwIfEmpty;
        } 
 
        //---------------------------------------------------------------------------------------
        // Executes the entire query tree, and aggregates the intermediate results into the 
        // final result based on the binary operators and final reduction.
        //
        // Return Value:
        //     The single result of aggregation. 
        //
 
        internal TOutput Aggregate() 
        {
            Contract.Assert(m_finalReduce != null); 
            Contract.Assert(m_resultSelector != null);

            TIntermediate accumulator = default(TIntermediate);
            bool hadElements = false; 

            // Because the final reduction is typically much cheaper than the intermediate 
            // reductions over the individual partitions, and because each parallel partition 
            // will do a lot of work to produce a single output element, we prefer to turn off
            // pipelining, and process the final reductions serially. 
            using (IEnumerator enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
            {
                // We just reduce the elements in each output partition. If the operation is associative,
                // this will yield the correct answer. If not, we should never be calling this routine. 
                while (enumerator.MoveNext())
                { 
                    if (hadElements) 
                    {
                        // Accumulate results by passing the current accumulation and current element to 
                        // the reduction operation.
                        try
                        {
                            accumulator = m_finalReduce(accumulator, enumerator.Current); 
                        }
                        catch (ThreadAbortException) 
                        { 
                            // Do not wrap ThreadAbortExceptions
                            throw; 
                        }
                        catch (Exception ex)
                        {
                            // We need to wrap all exceptions into an aggregate. 
                            throw new AggregateException(ex);
                        } 
                    } 
                    else
                    { 
                        // This is the first element. Just set the accumulator to the first element.
                        accumulator = enumerator.Current;
                        hadElements = true;
                    } 
                }
 
                // If there were no elements, we must throw an exception. 
                if (!hadElements)
                { 
                    if (m_throwIfEmpty)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.NoElements));
                    } 
                    else
                    { 
                        accumulator = m_seedFactory == null ? m_seed : m_seedFactory(); 
                    }
                } 
            }

            // Finally, run the selection routine to yield the final element.
            try 
            {
                return m_resultSelector(accumulator); 
            } 
            catch (ThreadAbortException)
            { 
                // Do not wrap ThreadAbortExceptions
                throw;
            }
            catch (Exception ex) 
            {
                // We need to wrap all exceptions into an aggregate. 
                throw new AggregateException(ex); 
            }
        } 

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed. 
        //
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            // We just open the child operator. 
            QueryResults childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void  WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, 
            bool preferStriping, QuerySettings settings) 
        {
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream(
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);

            for (int i = 0; i < partitionCount; i++) 
            {
                outputStream[i] = new AssociativeAggregationOperatorEnumerator(inputStream[i], this, i, settings.CancellationState.MergedCancellationToken); 
            } 

            recipient.Receive(outputStream); 
        }

        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially. 
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized."); 
            throw new NotSupportedException();
        }

 
        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        // 

        internal override bool LimitsParallelism 
        {
            get { return false; }
        }
 

        //--------------------------------------------------------------------------------------- 
        // This enumerator type encapsulates the intermediary aggregation over the underlying 
        // (possibly partitioned) data source.
        // 

        private class AssociativeAggregationOperatorEnumerator : QueryOperatorEnumerator
        {
            private readonly QueryOperatorEnumerator m_source; // The source data. 
            private readonly AssociativeAggregationOperator m_reduceOperator; // The operator.
            private readonly int m_partitionIndex; // The index of this partition. 
            private readonly CancellationToken m_cancellationToken; 
            private bool m_accumulated; // Whether we've accumulated already. (false-sharing risk, but only written once)
 

            //----------------------------------------------------------------------------------------
            // Instantiates a new aggregation operator.
            // 

            internal AssociativeAggregationOperatorEnumerator(QueryOperatorEnumerator source, 
                                                              AssociativeAggregationOperator reduceOperator, int partitionIndex, 
                                                              CancellationToken cancellationToken)
            { 
                Contract.Assert(source != null);
                Contract.Assert(reduceOperator != null);

                m_source = source; 
                m_reduceOperator = reduceOperator;
                m_partitionIndex = partitionIndex; 
                m_cancellationToken = cancellationToken; 
            }
 
            //----------------------------------------------------------------------------------------
            // This API, upon the first time calling it, walks the entire source query tree. It begins
            // with an accumulator value set to the aggregation operator's seed, and always passes
            // the accumulator along with the current element from the data source to the binary 
            // intermediary aggregation operator. The return value is kept in the accumulator. At
            // the end, we will have our intermediate result, ready for final aggregation. 
            // 

            internal override bool MoveNext(ref TIntermediate currentElement, ref int currentKey) 
            {
                Contract.Assert(m_reduceOperator != null);
                Contract.Assert(m_reduceOperator.m_intermediateReduce != null, "expected a compiled operator");
 
                // Only produce a single element.  Return false if MoveNext() was already called before.
                if (m_accumulated) 
                { 
                    return false;
                } 
                m_accumulated = true;

                bool hadNext = false;
                TIntermediate accumulator = default(TIntermediate); 

                // Initialize the accumulator. 
                if (m_reduceOperator.m_seedIsSpecified) 
                {
                    // If the seed is specified, initialize accumulator to the seed value. 
                    accumulator = m_reduceOperator.m_seedFactory == null
                                      ? m_reduceOperator.m_seed
                                      : m_reduceOperator.m_seedFactory();
                } 
                else
                { 
                    // If the seed is not specified, then we take the first element as the seed. 
                    // Seed may be unspecified only if TInput is the same as TIntermediate.
                    Contract.Assert(typeof(TInput) == typeof(TIntermediate)); 

                    TInput acc = default(TInput);
                    TKey accKeyUnused = default(TKey);
                    if (!m_source.MoveNext(ref acc, ref accKeyUnused)) return false; 
                    hadNext = true;
                    accumulator = (TIntermediate)((object)acc); 
                } 

                // Scan through the source and accumulate the result. 
                TInput input = default(TInput);
                TKey keyUnused = default(TKey);
                int i = 0;
                while (m_source.MoveNext(ref input, ref keyUnused)) 
                {
                    if ((i++ & CancellationState.POLL_INTERVAL) == 0) 
                        CancellationState.ThrowIfCanceled(m_cancellationToken); 
                    hadNext = true;
                    accumulator = m_reduceOperator.m_intermediateReduce(accumulator, input); 
                }

                if (hadNext)
                { 
                    currentElement = accumulator;
                    currentKey = m_partitionIndex; // A reduction's "index" is just its partition number. 
                    return true; 
                }
 
                return false;
            }

            protected override void Dispose(bool disposing) 
            {
                Contract.Assert(m_source != null); 
                m_source.Dispose(); 
            }
        } 
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// AssociativeAggregationOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// The aggregation operator is a little unique, in that the enumerators it returns
    /// yield intermediate results instead of the final results. That's because there is
    /// one last Aggregate operation that must occur in order to perform the final reduction 
    /// over the intermediate streams. In other words, the intermediate enumerators produced
    /// by this operator are never seen by other query operators or consumers directly. 
    /// 
    /// An aggregation performs parallel prefixing internally. Given a binary operator O,
    /// it will generate intermediate results by folding O across partitions; then it 
    /// performs a final reduction by folding O accross the intermediate results. The
    /// analysis engine knows about associativity and commutativity, and will ensure the
    /// style of partitioning inserted into the tree is compatable with the operator.
    /// 
    /// For instance, say O is + (meaning it is AC), our input is {1,2,...,8}, and we
    /// use 4 partitions to calculate the aggregation. Sequentially this would look 
    /// like this O(O(O(1,2),...),8), in other words ((1+2)+...)+8. The parallel prefix 
    /// of this (w/ 4 partitions) instead calculates the intermediate aggregations, i.e.:
    /// t1 = O(1,2), t2 = O(3,4), ... t4 = O(7,8), aka t1 = 1+2, t2 = 3+4, t4 = 7+8. 
    /// The final step is to aggregate O over these intermediaries, i.e.
    /// O(O(O(t1,t2),t3),t4), or ((t1+t2)+t3)+t4. This generalizes to any binary operator.
    ///
    /// Beause some aggregations use a different input, intermediate, and output types, 
    /// we support an even more generalized aggregation type. In this model, we have
    /// three operators, an intermediate (used for the incremental aggregations), a 
    /// final (used for the final summary of intermediate results), and a result selector 
    /// (used to perform whatever transformation is needed on the final summary).
    ///  
    /// 
    /// 
    /// 
    internal sealed class AssociativeAggregationOperator : UnaryQueryOperator 
    {
 
        private readonly TIntermediate m_seed; // A seed used during aggregation. 
        private readonly bool m_seedIsSpecified; // Whether a seed was specified. If not, the first element will be used.
        private readonly bool m_throwIfEmpty; // Whether to throw an exception if the data source is empty. 

        // An intermediate reduction function.
        private Func m_intermediateReduce;
 
        // A final reduction function.
        private Func m_finalReduce; 
 
        // The result selector function.
        private Func m_resultSelector; 

        // A function that constructs seed instances
        private Func m_seedFactory;
 
        //----------------------------------------------------------------------------------------
        // Constructs a new instance of an associative operator. 
        // 
        // Assumptions:
        //     This operator must be associative. 
        //

        internal AssociativeAggregationOperator(IEnumerable child, TIntermediate seed, Func seedFactory, bool seedIsSpecified,
                                                Func intermediateReduce, 
                                                Func finalReduce,
                                                Func resultSelector, bool throwIfEmpty, QueryAggregationOptions options) 
            :base(child) 
        {
            Contract.Assert(child != null, "child data source cannot be null"); 
            Contract.Assert(intermediateReduce != null, "need an intermediate reduce function");
            Contract.Assert(finalReduce != null, "need a final reduce function");
            Contract.Assert(resultSelector != null, "need a result selector function");
            Contract.Assert(Enum.IsDefined(typeof(QueryAggregationOptions), options), "enum out of valid range"); 
            Contract.Assert((options & QueryAggregationOptions.Associative) == QueryAggregationOptions.Associative, "expected an associative operator");
            Contract.Assert(typeof(TIntermediate) == typeof(TInput) || seedIsSpecified, "seed must be specified if TIntermediate differs from TInput"); 
 
            m_seed = seed;
            m_seedFactory = seedFactory; 
            m_seedIsSpecified = seedIsSpecified;
            m_intermediateReduce = intermediateReduce;
            m_finalReduce = finalReduce;
            m_resultSelector = resultSelector; 
            m_throwIfEmpty = throwIfEmpty;
        } 
 
        //---------------------------------------------------------------------------------------
        // Executes the entire query tree, and aggregates the intermediate results into the 
        // final result based on the binary operators and final reduction.
        //
        // Return Value:
        //     The single result of aggregation. 
        //
 
        internal TOutput Aggregate() 
        {
            Contract.Assert(m_finalReduce != null); 
            Contract.Assert(m_resultSelector != null);

            TIntermediate accumulator = default(TIntermediate);
            bool hadElements = false; 

            // Because the final reduction is typically much cheaper than the intermediate 
            // reductions over the individual partitions, and because each parallel partition 
            // will do a lot of work to produce a single output element, we prefer to turn off
            // pipelining, and process the final reductions serially. 
            using (IEnumerator enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
            {
                // We just reduce the elements in each output partition. If the operation is associative,
                // this will yield the correct answer. If not, we should never be calling this routine. 
                while (enumerator.MoveNext())
                { 
                    if (hadElements) 
                    {
                        // Accumulate results by passing the current accumulation and current element to 
                        // the reduction operation.
                        try
                        {
                            accumulator = m_finalReduce(accumulator, enumerator.Current); 
                        }
                        catch (ThreadAbortException) 
                        { 
                            // Do not wrap ThreadAbortExceptions
                            throw; 
                        }
                        catch (Exception ex)
                        {
                            // We need to wrap all exceptions into an aggregate. 
                            throw new AggregateException(ex);
                        } 
                    } 
                    else
                    { 
                        // This is the first element. Just set the accumulator to the first element.
                        accumulator = enumerator.Current;
                        hadElements = true;
                    } 
                }
 
                // If there were no elements, we must throw an exception. 
                if (!hadElements)
                { 
                    if (m_throwIfEmpty)
                    {
                        throw new InvalidOperationException(SR.GetString(SR.NoElements));
                    } 
                    else
                    { 
                        accumulator = m_seedFactory == null ? m_seed : m_seedFactory(); 
                    }
                } 
            }

            // Finally, run the selection routine to yield the final element.
            try 
            {
                return m_resultSelector(accumulator); 
            } 
            catch (ThreadAbortException)
            { 
                // Do not wrap ThreadAbortExceptions
                throw;
            }
            catch (Exception ex) 
            {
                // We need to wrap all exceptions into an aggregate. 
                throw new AggregateException(ex); 
            }
        } 

        //---------------------------------------------------------------------------------------
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed. 
        //
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            // We just open the child operator. 
            QueryResults childQueryResults = Child.Open(settings, preferStriping);
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
        }
 
        internal override void  WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, 
            bool preferStriping, QuerySettings settings) 
        {
            int partitionCount = inputStream.PartitionCount; 
            PartitionedStream outputStream = new PartitionedStream(
                partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);

            for (int i = 0; i < partitionCount; i++) 
            {
                outputStream[i] = new AssociativeAggregationOperatorEnumerator(inputStream[i], this, i, settings.CancellationState.MergedCancellationToken); 
            } 

            recipient.Receive(outputStream); 
        }

        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially. 
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized."); 
            throw new NotSupportedException();
        }

 
        //----------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge. 
        // 

        internal override bool LimitsParallelism 
        {
            get { return false; }
        }
 

        //--------------------------------------------------------------------------------------- 
        // This enumerator type encapsulates the intermediary aggregation over the underlying 
        // (possibly partitioned) data source.
        // 

        private class AssociativeAggregationOperatorEnumerator : QueryOperatorEnumerator
        {
            private readonly QueryOperatorEnumerator m_source; // The source data. 
            private readonly AssociativeAggregationOperator m_reduceOperator; // The operator.
            private readonly int m_partitionIndex; // The index of this partition. 
            private readonly CancellationToken m_cancellationToken; 
            private bool m_accumulated; // Whether we've accumulated already. (false-sharing risk, but only written once)
 

            //----------------------------------------------------------------------------------------
            // Instantiates a new aggregation operator.
            // 

            internal AssociativeAggregationOperatorEnumerator(QueryOperatorEnumerator source, 
                                                              AssociativeAggregationOperator reduceOperator, int partitionIndex, 
                                                              CancellationToken cancellationToken)
            { 
                Contract.Assert(source != null);
                Contract.Assert(reduceOperator != null);

                m_source = source; 
                m_reduceOperator = reduceOperator;
                m_partitionIndex = partitionIndex; 
                m_cancellationToken = cancellationToken; 
            }
 
            //----------------------------------------------------------------------------------------
            // This API, upon the first time calling it, walks the entire source query tree. It begins
            // with an accumulator value set to the aggregation operator's seed, and always passes
            // the accumulator along with the current element from the data source to the binary 
            // intermediary aggregation operator. The return value is kept in the accumulator. At
            // the end, we will have our intermediate result, ready for final aggregation. 
            // 

            internal override bool MoveNext(ref TIntermediate currentElement, ref int currentKey) 
            {
                Contract.Assert(m_reduceOperator != null);
                Contract.Assert(m_reduceOperator.m_intermediateReduce != null, "expected a compiled operator");
 
                // Only produce a single element.  Return false if MoveNext() was already called before.
                if (m_accumulated) 
                { 
                    return false;
                } 
                m_accumulated = true;

                bool hadNext = false;
                TIntermediate accumulator = default(TIntermediate); 

                // Initialize the accumulator. 
                if (m_reduceOperator.m_seedIsSpecified) 
                {
                    // If the seed is specified, initialize accumulator to the seed value. 
                    accumulator = m_reduceOperator.m_seedFactory == null
                                      ? m_reduceOperator.m_seed
                                      : m_reduceOperator.m_seedFactory();
                } 
                else
                { 
                    // If the seed is not specified, then we take the first element as the seed. 
                    // Seed may be unspecified only if TInput is the same as TIntermediate.
                    Contract.Assert(typeof(TInput) == typeof(TIntermediate)); 

                    TInput acc = default(TInput);
                    TKey accKeyUnused = default(TKey);
                    if (!m_source.MoveNext(ref acc, ref accKeyUnused)) return false; 
                    hadNext = true;
                    accumulator = (TIntermediate)((object)acc); 
                } 

                // Scan through the source and accumulate the result. 
                TInput input = default(TInput);
                TKey keyUnused = default(TKey);
                int i = 0;
                while (m_source.MoveNext(ref input, ref keyUnused)) 
                {
                    if ((i++ & CancellationState.POLL_INTERVAL) == 0) 
                        CancellationState.ThrowIfCanceled(m_cancellationToken); 
                    hadNext = true;
                    accumulator = m_reduceOperator.m_intermediateReduce(accumulator, input); 
                }

                if (hadNext)
                { 
                    currentElement = accumulator;
                    currentKey = m_partitionIndex; // A reduction's "index" is just its partition number. 
                    return true; 
                }
 
                return false;
            }

            protected override void Dispose(bool disposing) 
            {
                Contract.Assert(m_source != null); 
                m_source.Dispose(); 
            }
        } 
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK