Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / GroupByQueryOperator.cs / 1305376 / GroupByQueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// GroupByQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using IEnumerator=System.Collections.IEnumerator;
namespace System.Linq.Parallel
{
///
/// The operator type for GroupBy statements. This operator groups the input based on
/// a key-selection routine, yielding one-to-many values of key-to-elements. The
/// implementation is very much like the hash join operator, in which we first build
/// a big hashtable of the input; then we just iterate over each unique key in the
/// hashtable, yielding it plus all of the elements with the same key.
///
///
///
///
internal sealed class GroupByQueryOperator :
UnaryQueryOperator>
{
private readonly Func m_keySelector; // Key selection function.
private readonly Func m_elementSelector; // Optional element selection function.
private readonly IEqualityComparer m_keyComparer; // An optional key comparison object.
//----------------------------------------------------------------------------------------
// Initializes a new group by operator.
//
// Arguments:
// child - the child operator or data source from which to pull data
// keySelector - a delegate representing the key selector function
// elementSelector - a delegate representing the element selector function
// keyComparer - an optional key comparison routine
//
// Assumptions:
// keySelector must be non null.
// elementSelector must be non null.
//
internal GroupByQueryOperator(IEnumerable child,
Func keySelector,
Func elementSelector,
IEqualityComparer keyComparer)
:base(child)
{
Contract.Assert(child != null, "child data source cannot be null");
Contract.Assert(keySelector != null, "need a selector function");
Contract.Assert(elementSelector != null ||
typeof(TSource) == typeof(TElement), "need an element function if TSource!=TElement");
m_keySelector = keySelector;
m_elementSelector = elementSelector;
m_keyComparer = keyComparer;
SetOrdinalIndexState(OrdinalIndexState.Shuffled);
}
internal override void WrapPartitionedStream(
PartitionedStream inputStream, IPartitionedStreamRecipient> recipient,
bool preferStriping, QuerySettings settings)
{
// Hash-repartion the source stream
if (Child.OutputOrdered)
{
WrapPartitionedStreamHelperOrdered(
ExchangeUtilities.HashRepartitionOrdered(
inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
recipient,
settings.CancellationState.MergedCancellationToken
);
}
else
{
WrapPartitionedStreamHelper(
ExchangeUtilities.HashRepartition(
inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
recipient,
settings.CancellationState.MergedCancellationToken
);
}
}
//---------------------------------------------------------------------------------------
// This is a helper method. WrapPartitionedStream decides what type TKey is going
// to be, and then call this method with that key as a generic parameter.
//
private void WrapPartitionedStreamHelper(
PartitionedStream, TKey> hashStream,
IPartitionedStreamRecipient> recipient,
CancellationToken cancellationToken)
{
int partitionCount = hashStream.PartitionCount;
PartitionedStream, TKey> outputStream =
new PartitionedStream, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
// If there is no element selector, we return a special identity enumerator. Otherwise,
// we return one that will apply the element selection function during enumeration.
for (int i = 0; i < partitionCount; i++)
{
if (m_elementSelector == null)
{
Contract.Assert(typeof(TSource) == typeof(TElement));
var enumerator = new GroupByIdentityQueryOperatorEnumerator(
hashStream[i], m_keyComparer, cancellationToken);
outputStream[i] = (QueryOperatorEnumerator, TKey>)(object)enumerator;
}
else
{
outputStream[i] = new GroupByElementSelectorQueryOperatorEnumerator(
hashStream[i], m_keyComparer, m_elementSelector, cancellationToken);
}
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// This is a helper method. WrapPartitionedStream decides what type TKey is going
// to be, and then call this method with that key as a generic parameter.
//
private void WrapPartitionedStreamHelperOrdered(
PartitionedStream, TKey> hashStream,
IPartitionedStreamRecipient> recipient,
CancellationToken cancellationToken)
{
int partitionCount = hashStream.PartitionCount;
PartitionedStream, TKey> outputStream =
new PartitionedStream, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
// If there is no element selector, we return a special identity enumerator. Otherwise,
// we return one that will apply the element selection function during enumeration.
IComparer orderComparer = hashStream.KeyComparer;
for (int i = 0; i < partitionCount; i++)
{
if (m_elementSelector == null)
{
Contract.Assert(typeof(TSource) == typeof(TElement));
var enumerator = new OrderedGroupByIdentityQueryOperatorEnumerator(
hashStream[i], m_keySelector, m_keyComparer, orderComparer, cancellationToken);
outputStream[i] = (QueryOperatorEnumerator, TKey>)(object)enumerator;
}
else
{
outputStream[i] = new OrderedGroupByElementSelectorQueryOperatorEnumerator(
hashStream[i], m_keySelector, m_elementSelector, m_keyComparer, orderComparer,
cancellationToken);
}
}
recipient.Receive(outputStream);
}
//-----------------------------------------------------------------------------------
// Override of the query operator base class's Open method.
//
internal override QueryResults> Open(QuerySettings settings, bool preferStriping)
{
// We just open our child operator. Do not propagate the preferStriping value, but instead explicitly
// set it to false. Regardless of whether the parent prefers striping or range partitioning, the output
// will be hash-partititioned.
QueryResults childResults = Child.Open(settings, false);
return new UnaryQueryOperatorResults(childResults, this, settings, false);
}
//----------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable> AsSequentialQuery(CancellationToken token)
{
IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token);
if (m_elementSelector == null)
{
Contract.Assert(typeof(TElement) == typeof(TSource));
return (IEnumerable>)(object)(wrappedChild.GroupBy(m_keySelector, m_keyComparer));
}
else
{
return wrappedChild.GroupBy(m_keySelector, m_elementSelector, m_keyComparer);
}
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return false; }
}
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for grouping elements and yielding the key-value sets.
//
// Assumptions:
// Just like the Join operator, this won't work properly at all if the analysis engine
// didn't choose to hash partition. We will simply not yield correct groupings.
//
internal abstract class GroupByQueryOperatorEnumerator :
QueryOperatorEnumerator, TOrderKey>
{
protected readonly QueryOperatorEnumerator, TOrderKey> m_source; // The data source to enumerate.
protected readonly IEqualityComparer m_keyComparer; // A key comparer.
protected readonly CancellationToken m_cancellationToken;
private Mutables m_mutables; // All of the mutable state.
class Mutables
{
internal HashLookup, ListChunk> m_hashLookup; // The lookup with key-value mappings.
internal int m_hashLookupIndex; // The current index within the lookup.
}
//----------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
protected GroupByQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, CancellationToken cancellationToken)
{
Contract.Assert(source != null);
m_source = source;
m_keyComparer = keyComparer;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// MoveNext will invoke the entire query sub-tree, accumulating results into a hash-
// table, upon the first call. Then for the first call and all subsequent calls, we will
// just enumerate the key-set from the hash-table, retrieving groupings of key-elements.
//
internal override bool MoveNext(ref IGrouping currentElement, ref TOrderKey currentKey)
{
Contract.Assert(m_source != null);
// Lazy-init the mutable state. This also means we haven't yet built our lookup of
// groupings, so we can go ahead and do that too.
Mutables mutables = m_mutables;
if (mutables == null)
{
mutables = m_mutables = new Mutables();
// Build the hash lookup and start enumerating the lookup at the beginning.
mutables.m_hashLookup = BuildHashLookup();
Contract.Assert(mutables.m_hashLookup != null);
mutables.m_hashLookupIndex = -1;
}
// Now, with a hash lookup in hand, we just enumerate the keys. So long
// as the key-value lookup has elements, we have elements.
if (++mutables.m_hashLookupIndex < mutables.m_hashLookup.Count)
{
currentElement = new GroupByGrouping(
mutables.m_hashLookup[mutables.m_hashLookupIndex]);
return true;
}
return false;
}
//------------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected abstract HashLookup, ListChunk> BuildHashLookup();
protected override void Dispose(bool disposing)
{
m_source.Dispose();
}
}
//---------------------------------------------------------------------------------------
// A specialization of the group by enumerator for yielding elements with the identity
// function.
//
internal sealed class GroupByIdentityQueryOperatorEnumerator :
GroupByQueryOperatorEnumerator
{
//---------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal GroupByIdentityQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, CancellationToken cancellationToken)
: base(source, keyComparer, cancellationToken)
{
}
//-----------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, ListChunk> BuildHashLookup()
{
HashLookup, ListChunk> hashlookup =
new HashLookup, ListChunk>(new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceKeyUnused = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
ListChunk currentValue = null;
if (!hashlookup.TryGetValue(key, ref currentValue))
{
const int INITIAL_CHUNK_SIZE = 2;
currentValue = new ListChunk(INITIAL_CHUNK_SIZE);
hashlookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
// Call to the base class to yield the current value.
currentValue.Add(sourceElement.First);
}
return hashlookup;
}
}
//----------------------------------------------------------------------------------------
// A specialization of the group by enumerator for yielding elements with any arbitrary
// element selection function.
//
internal sealed class GroupByElementSelectorQueryOperatorEnumerator :
GroupByQueryOperatorEnumerator
{
private readonly Func m_elementSelector; // Function to select elements.
//---------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal GroupByElementSelectorQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, Func elementSelector, CancellationToken cancellationToken) :
base(source, keyComparer, cancellationToken)
{
Contract.Assert(elementSelector != null);
m_elementSelector = elementSelector;
}
//------------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, ListChunk> BuildHashLookup()
{
HashLookup, ListChunk> hashlookup =
new HashLookup, ListChunk>(new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceKeyUnused = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
ListChunk currentValue = null;
if (!hashlookup.TryGetValue(key, ref currentValue))
{
const int INITIAL_CHUNK_SIZE = 2;
currentValue = new ListChunk(INITIAL_CHUNK_SIZE);
hashlookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
// Call to the base class to yield the current value.
currentValue.Add(m_elementSelector(sourceElement.First));
}
return hashlookup;
}
}
//----------------------------------------------------------------------------------------
// Ordered version of the GroupBy operator.
//
internal abstract class OrderedGroupByQueryOperatorEnumerator :
QueryOperatorEnumerator, TOrderKey>
{
protected readonly QueryOperatorEnumerator, TOrderKey> m_source; // The data source to enumerate.
private readonly Func m_keySelector; // The key selection routine.
protected readonly IEqualityComparer m_keyComparer; // The key comparison routine.
protected readonly IComparer m_orderComparer; // The comparison routine for order keys.
protected readonly CancellationToken m_cancellationToken;
private Mutables m_mutables; // All the mutable state.
class Mutables
{
internal HashLookup, GroupKeyData> m_hashLookup; // The lookup with key-value mappings.
internal int m_hashLookupIndex; // The current index within the lookup.
}
//---------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
protected OrderedGroupByQueryOperatorEnumerator(QueryOperatorEnumerator, TOrderKey> source,
Func keySelector, IEqualityComparer keyComparer, IComparer orderComparer,
CancellationToken cancellationToken)
{
Contract.Assert(source != null);
Contract.Assert(keySelector != null);
m_source = source;
m_keySelector = keySelector;
m_keyComparer = keyComparer;
m_orderComparer = orderComparer;
m_cancellationToken = cancellationToken;
}
//----------------------------------------------------------------------------------------
// MoveNext will invoke the entire query sub-tree, accumulating results into a hash-
// table, upon the first call. Then for the first call and all subsequent calls, we will
// just enumerate the key-set from the hash-table, retrieving groupings of key-elements.
//
internal override bool MoveNext(ref IGrouping currentElement, ref TOrderKey currentKey)
{
Contract.Assert(m_source != null);
Contract.Assert(m_keySelector != null);
// Lazy-init the mutable state. This also means we haven't yet built our lookup of
// groupings, so we can go ahead and do that too.
Mutables mutables = m_mutables;
if (mutables == null)
{
mutables = m_mutables = new Mutables();
// Build the hash lookup and start enumerating the lookup at the beginning.
mutables.m_hashLookup = BuildHashLookup();
Contract.Assert(mutables.m_hashLookup != null);
mutables.m_hashLookupIndex = -1;
}
// Now, with a hash lookup in hand, we just enumerate the keys. So long
// as the key-value lookup has elements, we have elements.
if (++mutables.m_hashLookupIndex < mutables.m_hashLookup.Count)
{
GroupKeyData value = mutables.m_hashLookup[mutables.m_hashLookupIndex].Value;
currentElement = value.m_grouping;
currentKey = value.m_orderKey;
return true;
}
return false;
}
//-----------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected abstract HashLookup, GroupKeyData> BuildHashLookup();
protected override void Dispose(bool disposing)
{
m_source.Dispose();
}
//-----------------------------------------------------------------------------------
// A data structure that holds information about elements with a particular key.
//
// This information includes two parts:
// - An order key for the grouping.
// - The grouping itself. The grouping consists of elements and the grouping key.
//
protected class GroupKeyData
{
internal TOrderKey m_orderKey;
internal OrderedGroupByGrouping m_grouping;
internal GroupKeyData(TOrderKey orderKey, TGroupKey hashKey, IComparer orderComparer)
{
m_orderKey = orderKey;
m_grouping = new OrderedGroupByGrouping(hashKey, orderComparer);
}
}
}
//---------------------------------------------------------------------------------------
// A specialization of the ordered GroupBy enumerator for yielding elements with the identity
// function.
//
internal sealed class OrderedGroupByIdentityQueryOperatorEnumerator :
OrderedGroupByQueryOperatorEnumerator
{
//----------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal OrderedGroupByIdentityQueryOperatorEnumerator(QueryOperatorEnumerator, TOrderKey> source,
Func keySelector, IEqualityComparer keyComparer, IComparer orderComparer,
CancellationToken cancellationToken)
: base(source, keySelector, keyComparer, orderComparer, cancellationToken)
{
}
//-----------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, GroupKeyData> BuildHashLookup()
{
HashLookup, GroupKeyData> hashLookup = new HashLookup, GroupKeyData>(
new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceOrderKey = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceOrderKey))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
GroupKeyData currentValue = null;
if (hashLookup.TryGetValue(key, ref currentValue))
{
if (m_orderComparer.Compare(sourceOrderKey, currentValue.m_orderKey) < 0)
{
currentValue.m_orderKey = sourceOrderKey;
}
}
else
{
currentValue = new GroupKeyData(sourceOrderKey, key.Value, m_orderComparer);
hashLookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
currentValue.m_grouping.Add(sourceElement.First, sourceOrderKey);
}
// Sort the elements within each group
for (int j = 0; j < hashLookup.Count; j++)
{
hashLookup[j].Value.m_grouping.DoneAdding();
}
return hashLookup;
}
}
//----------------------------------------------------------------------------------------
// A specialization of the ordered GroupBy enumerator for yielding elements with any arbitrary
// element selection function.
//
internal sealed class OrderedGroupByElementSelectorQueryOperatorEnumerator :
OrderedGroupByQueryOperatorEnumerator
{
private readonly Func m_elementSelector; // Function to select elements.
//----------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal OrderedGroupByElementSelectorQueryOperatorEnumerator(QueryOperatorEnumerator, TOrderKey> source,
Func keySelector, Func elementSelector, IEqualityComparer keyComparer, IComparer orderComparer,
CancellationToken cancellationToken) :
base(source, keySelector, keyComparer, orderComparer, cancellationToken)
{
Contract.Assert(elementSelector != null);
m_elementSelector = elementSelector;
}
//-----------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, GroupKeyData> BuildHashLookup()
{
HashLookup, GroupKeyData> hashLookup = new HashLookup, GroupKeyData>(
new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceOrderKey = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceOrderKey))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
GroupKeyData currentValue = null;
if (hashLookup.TryGetValue(key, ref currentValue))
{
if (m_orderComparer.Compare(sourceOrderKey, currentValue.m_orderKey) < 0)
{
currentValue.m_orderKey = sourceOrderKey;
}
}
else
{
currentValue = new GroupKeyData(sourceOrderKey, key.Value, m_orderComparer);
hashLookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
// Call to the base class to yield the current value.
currentValue.m_grouping.Add(m_elementSelector(sourceElement.First), sourceOrderKey);
}
// Sort the elements within each group
for (int j = 0; j < hashLookup.Count; j++)
{
hashLookup[j].Value.m_grouping.DoneAdding();
}
return hashLookup;
}
}
//----------------------------------------------------------------------------------------
// This little type implements the IGrouping interface, and exposes a single
// key-to-many-values mapping.
//
internal class GroupByGrouping : IGrouping
{
private KeyValuePair, ListChunk> m_keyValues; // A key value pair.
//---------------------------------------------------------------------------------------
// Constructs a new grouping out of the key value pair.
//
internal GroupByGrouping(TGroupKey key)
{
const int INITIAL_CHUNK_SIZE = 2;
m_keyValues =
new KeyValuePair, ListChunk>(
new Wrapper(key), new ListChunk(INITIAL_CHUNK_SIZE));
}
internal GroupByGrouping(KeyValuePair, ListChunk> keyValues)
{
Contract.Assert(keyValues.Value != null);
m_keyValues = keyValues;
}
//---------------------------------------------------------------------------------------
// Adds an element into the grouping.
//
internal void Add(TElement element)
{
m_keyValues.Value.Add(element);
}
//---------------------------------------------------------------------------------------
// The key this mapping represents.
//
TGroupKey IGrouping.Key
{
get
{
return m_keyValues.Key.Value;
}
}
//----------------------------------------------------------------------------------------
// Access to value enumerators.
//
IEnumerator IEnumerable.GetEnumerator()
{
Contract.Assert(m_keyValues.Value != null);
return m_keyValues.Value.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)this).GetEnumerator();
}
}
///
/// An ordered version of the grouping data structure. Represents an ordered group of elements that
/// have the same grouping key.
///
internal class OrderedGroupByGrouping : IGrouping
{
private TGroupKey m_groupKey; // The group key for this grouping
private GrowingArray m_values; // Values in this group
private GrowingArray m_orderKeys; // Order keys that correspond to the values
private IComparer m_orderComparer; // Comparer for order keys
///
/// Constructs a new grouping
///
internal OrderedGroupByGrouping(
TGroupKey groupKey,
IComparer orderComparer)
{
m_groupKey = groupKey;
m_values = new GrowingArray();
m_orderKeys = new GrowingArray();
m_orderComparer = orderComparer;
}
///
/// The key this grouping represents.
///
TGroupKey IGrouping.Key
{
get
{
return m_groupKey;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
Contract.Assert(m_values != null);
int valueCount = m_values.Count;
TElement[] valueArray = m_values.InternalArray;
Contract.Assert(valueArray.Length >= valueCount); // valueArray.Length may be larger than valueCount
for (int i = 0; i < valueCount; i++)
{
yield return valueArray[i];
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)this).GetEnumerator();
}
///
/// Add an element
///
internal void Add(TElement value, TOrderKey orderKey)
{
Contract.Assert(m_values != null);
Contract.Assert(m_orderKeys != null);
m_values.Add(value);
m_orderKeys.Add(orderKey);
}
///
/// No more elements will be added, so we can sort the group now.
///
internal void DoneAdding()
{
Contract.Assert(m_values != null);
Contract.Assert(m_orderKeys != null);
Array.Sort(
m_orderKeys.InternalArray, m_values.InternalArray,
0, m_values.Count,
m_orderComparer);
#if DEBUG
m_orderKeys = null; // Any future calls to Add() or DoneAdding() will fail
#endif
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// GroupByQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using IEnumerator=System.Collections.IEnumerator;
namespace System.Linq.Parallel
{
///
/// The operator type for GroupBy statements. This operator groups the input based on
/// a key-selection routine, yielding one-to-many values of key-to-elements. The
/// implementation is very much like the hash join operator, in which we first build
/// a big hashtable of the input; then we just iterate over each unique key in the
/// hashtable, yielding it plus all of the elements with the same key.
///
///
///
///
internal sealed class GroupByQueryOperator :
UnaryQueryOperator>
{
private readonly Func m_keySelector; // Key selection function.
private readonly Func m_elementSelector; // Optional element selection function.
private readonly IEqualityComparer m_keyComparer; // An optional key comparison object.
//----------------------------------------------------------------------------------------
// Initializes a new group by operator.
//
// Arguments:
// child - the child operator or data source from which to pull data
// keySelector - a delegate representing the key selector function
// elementSelector - a delegate representing the element selector function
// keyComparer - an optional key comparison routine
//
// Assumptions:
// keySelector must be non null.
// elementSelector must be non null.
//
internal GroupByQueryOperator(IEnumerable child,
Func keySelector,
Func elementSelector,
IEqualityComparer keyComparer)
:base(child)
{
Contract.Assert(child != null, "child data source cannot be null");
Contract.Assert(keySelector != null, "need a selector function");
Contract.Assert(elementSelector != null ||
typeof(TSource) == typeof(TElement), "need an element function if TSource!=TElement");
m_keySelector = keySelector;
m_elementSelector = elementSelector;
m_keyComparer = keyComparer;
SetOrdinalIndexState(OrdinalIndexState.Shuffled);
}
internal override void WrapPartitionedStream(
PartitionedStream inputStream, IPartitionedStreamRecipient> recipient,
bool preferStriping, QuerySettings settings)
{
// Hash-repartion the source stream
if (Child.OutputOrdered)
{
WrapPartitionedStreamHelperOrdered(
ExchangeUtilities.HashRepartitionOrdered(
inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
recipient,
settings.CancellationState.MergedCancellationToken
);
}
else
{
WrapPartitionedStreamHelper(
ExchangeUtilities.HashRepartition(
inputStream, m_keySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
recipient,
settings.CancellationState.MergedCancellationToken
);
}
}
//---------------------------------------------------------------------------------------
// This is a helper method. WrapPartitionedStream decides what type TKey is going
// to be, and then call this method with that key as a generic parameter.
//
private void WrapPartitionedStreamHelper(
PartitionedStream, TKey> hashStream,
IPartitionedStreamRecipient> recipient,
CancellationToken cancellationToken)
{
int partitionCount = hashStream.PartitionCount;
PartitionedStream, TKey> outputStream =
new PartitionedStream, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
// If there is no element selector, we return a special identity enumerator. Otherwise,
// we return one that will apply the element selection function during enumeration.
for (int i = 0; i < partitionCount; i++)
{
if (m_elementSelector == null)
{
Contract.Assert(typeof(TSource) == typeof(TElement));
var enumerator = new GroupByIdentityQueryOperatorEnumerator(
hashStream[i], m_keyComparer, cancellationToken);
outputStream[i] = (QueryOperatorEnumerator, TKey>)(object)enumerator;
}
else
{
outputStream[i] = new GroupByElementSelectorQueryOperatorEnumerator(
hashStream[i], m_keyComparer, m_elementSelector, cancellationToken);
}
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// This is a helper method. WrapPartitionedStream decides what type TKey is going
// to be, and then call this method with that key as a generic parameter.
//
private void WrapPartitionedStreamHelperOrdered(
PartitionedStream, TKey> hashStream,
IPartitionedStreamRecipient> recipient,
CancellationToken cancellationToken)
{
int partitionCount = hashStream.PartitionCount;
PartitionedStream, TKey> outputStream =
new PartitionedStream, TKey>(partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled);
// If there is no element selector, we return a special identity enumerator. Otherwise,
// we return one that will apply the element selection function during enumeration.
IComparer orderComparer = hashStream.KeyComparer;
for (int i = 0; i < partitionCount; i++)
{
if (m_elementSelector == null)
{
Contract.Assert(typeof(TSource) == typeof(TElement));
var enumerator = new OrderedGroupByIdentityQueryOperatorEnumerator(
hashStream[i], m_keySelector, m_keyComparer, orderComparer, cancellationToken);
outputStream[i] = (QueryOperatorEnumerator, TKey>)(object)enumerator;
}
else
{
outputStream[i] = new OrderedGroupByElementSelectorQueryOperatorEnumerator(
hashStream[i], m_keySelector, m_elementSelector, m_keyComparer, orderComparer,
cancellationToken);
}
}
recipient.Receive(outputStream);
}
//-----------------------------------------------------------------------------------
// Override of the query operator base class's Open method.
//
internal override QueryResults> Open(QuerySettings settings, bool preferStriping)
{
// We just open our child operator. Do not propagate the preferStriping value, but instead explicitly
// set it to false. Regardless of whether the parent prefers striping or range partitioning, the output
// will be hash-partititioned.
QueryResults childResults = Child.Open(settings, false);
return new UnaryQueryOperatorResults(childResults, this, settings, false);
}
//----------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable> AsSequentialQuery(CancellationToken token)
{
IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token);
if (m_elementSelector == null)
{
Contract.Assert(typeof(TElement) == typeof(TSource));
return (IEnumerable>)(object)(wrappedChild.GroupBy(m_keySelector, m_keyComparer));
}
else
{
return wrappedChild.GroupBy(m_keySelector, m_elementSelector, m_keyComparer);
}
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return false; }
}
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for grouping elements and yielding the key-value sets.
//
// Assumptions:
// Just like the Join operator, this won't work properly at all if the analysis engine
// didn't choose to hash partition. We will simply not yield correct groupings.
//
internal abstract class GroupByQueryOperatorEnumerator :
QueryOperatorEnumerator, TOrderKey>
{
protected readonly QueryOperatorEnumerator, TOrderKey> m_source; // The data source to enumerate.
protected readonly IEqualityComparer m_keyComparer; // A key comparer.
protected readonly CancellationToken m_cancellationToken;
private Mutables m_mutables; // All of the mutable state.
class Mutables
{
internal HashLookup, ListChunk> m_hashLookup; // The lookup with key-value mappings.
internal int m_hashLookupIndex; // The current index within the lookup.
}
//----------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
protected GroupByQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, CancellationToken cancellationToken)
{
Contract.Assert(source != null);
m_source = source;
m_keyComparer = keyComparer;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// MoveNext will invoke the entire query sub-tree, accumulating results into a hash-
// table, upon the first call. Then for the first call and all subsequent calls, we will
// just enumerate the key-set from the hash-table, retrieving groupings of key-elements.
//
internal override bool MoveNext(ref IGrouping currentElement, ref TOrderKey currentKey)
{
Contract.Assert(m_source != null);
// Lazy-init the mutable state. This also means we haven't yet built our lookup of
// groupings, so we can go ahead and do that too.
Mutables mutables = m_mutables;
if (mutables == null)
{
mutables = m_mutables = new Mutables();
// Build the hash lookup and start enumerating the lookup at the beginning.
mutables.m_hashLookup = BuildHashLookup();
Contract.Assert(mutables.m_hashLookup != null);
mutables.m_hashLookupIndex = -1;
}
// Now, with a hash lookup in hand, we just enumerate the keys. So long
// as the key-value lookup has elements, we have elements.
if (++mutables.m_hashLookupIndex < mutables.m_hashLookup.Count)
{
currentElement = new GroupByGrouping(
mutables.m_hashLookup[mutables.m_hashLookupIndex]);
return true;
}
return false;
}
//------------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected abstract HashLookup, ListChunk> BuildHashLookup();
protected override void Dispose(bool disposing)
{
m_source.Dispose();
}
}
//---------------------------------------------------------------------------------------
// A specialization of the group by enumerator for yielding elements with the identity
// function.
//
internal sealed class GroupByIdentityQueryOperatorEnumerator :
GroupByQueryOperatorEnumerator
{
//---------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal GroupByIdentityQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, CancellationToken cancellationToken)
: base(source, keyComparer, cancellationToken)
{
}
//-----------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, ListChunk> BuildHashLookup()
{
HashLookup, ListChunk> hashlookup =
new HashLookup, ListChunk>(new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceKeyUnused = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
ListChunk currentValue = null;
if (!hashlookup.TryGetValue(key, ref currentValue))
{
const int INITIAL_CHUNK_SIZE = 2;
currentValue = new ListChunk(INITIAL_CHUNK_SIZE);
hashlookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
// Call to the base class to yield the current value.
currentValue.Add(sourceElement.First);
}
return hashlookup;
}
}
//----------------------------------------------------------------------------------------
// A specialization of the group by enumerator for yielding elements with any arbitrary
// element selection function.
//
internal sealed class GroupByElementSelectorQueryOperatorEnumerator :
GroupByQueryOperatorEnumerator
{
private readonly Func m_elementSelector; // Function to select elements.
//---------------------------------------------------------------------------------------
// Instantiates a new group by enumerator.
//
internal GroupByElementSelectorQueryOperatorEnumerator(
QueryOperatorEnumerator, TOrderKey> source,
IEqualityComparer keyComparer, Func elementSelector, CancellationToken cancellationToken) :
base(source, keyComparer, cancellationToken)
{
Contract.Assert(elementSelector != null);
m_elementSelector = elementSelector;
}
//------------------------------------------------------------------------------------
// Builds the hash lookup, transforming from TSource to TElement through whatever means is appropriate.
//
protected override HashLookup, ListChunk> BuildHashLookup()
{
HashLookup, ListChunk> hashlookup =
new HashLookup, ListChunk>(new WrapperEqualityComparer(m_keyComparer));
Pair sourceElement = default(Pair);
TOrderKey sourceKeyUnused = default(TOrderKey);
int i = 0;
while (m_source.MoveNext(ref sourceElement, ref sourceKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// Generate a key and place it into the hashtable.
Wrapper key = new Wrapper(sourceElement.Second);
// If the key already exists, we just append it to the existing list --
// otherwise we will create a new one and add it to that instead.
ListChunk currentValue = null;
if (!hashlookup.TryGetValue(key, ref currentValue))
{
const int INITIAL_CHUNK_SIZE = 2;
currentValue = new ListChunk(INITIAL_CHUNK_SIZE);
hashlookup.Add(key, currentValue);
}
Contract.Assert(currentValue != null);
// Call to the base class to yield the current value.
currentValue.Add(m_elementSelector(sourceElement.First));
}
return hashlookup;
}
}
//----------------------------------------------------------------------------------------
// Ordered version of the GroupBy operator.
//
internal abstract class OrderedGroupByQueryOperatorEnumerator :
QueryOperatorEnumerator, TOrderKey>
{
protected readonly QueryOperatorEnumerator, TOrderKey> m_source; // The data source to enumerate.
private readonly Func m_keySelector; // The key selection routine.
protected readonly IEqualityComparer