Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Collections / Concurrent / PartitionerStatic.cs / 1305376 / PartitionerStatic.cs
#pragma warning disable 0420
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// PartitionerStatic.cs
//
// [....]
//
// A class of default partitioners for Partitioner
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Security.Permissions;
using System.Threading;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
namespace System.Collections.Concurrent
{
// The static class Partitioners implements 3 default partitioning strategies:
// 1. dynamic load balance partitioning for indexable data source (IList and arrays)
// 2. static partitioning for indexable data source (IList and arrays)
// 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
// of elements, but enuemrators are not indexable
// - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
// We assume that the source data of IList/Array is not changing concurrently.
// - data source of type IEnumerable can only be partitioned dynamically (load-balance)
// - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the
// implementation is different for data source of IList/Array vs. IEnumerable:
// * When the source collection is IList/Arrays, we use Interlocked on the shared index;
// * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source
// enumerator.
///
/// Provides common partitioning strategies for arrays, lists, and enumerables.
///
///
///
/// The static methods on are all thread-safe and may be used concurrently
/// from multiple threads. However, while a created partitioner is in use, the underlying data source
/// should not be modified, whether from the same thread that's using a partitioner or from a separate
/// thread.
///
///
[HostProtection(Synchronization = true, ExternalThreading = true)]
public static class Partitioner
{
///
/// Creates an orderable partitioner from an
/// instance.
///
/// Type of the elements in source list.
/// The list to be partitioned.
///
/// A Boolean value that indicates whether the created partitioner should dynamically
/// load balance between partitions rather than statically partition.
///
///
/// An orderable partitioner based on the input list.
///
public static OrderablePartitioner Create(IList list, bool loadBalance)
{
if (list == null)
{
throw new ArgumentNullException("list");
}
if (loadBalance)
{
return (new DynamicPartitionerForIList(list));
}
else
{
return (new StaticIndexRangePartitionerForIList(list));
}
}
///
/// Creates an orderable partitioner from a instance.
///
/// Type of the elements in source array.
/// The array to be partitioned.
///
/// A Boolean value that indicates whether the created partitioner should dynamically load balance
/// between partitions rather than statically partition.
///
///
/// An orderable partitioner based on the input array.
///
public static OrderablePartitioner Create(TSource[] array, bool loadBalance)
{
// This implementation uses 'ldelem' instructions for element retrieval, rather than using a
// method call.
if (array == null)
{
throw new ArgumentNullException("array");
}
if (loadBalance)
{
return (new DynamicPartitionerForArray(array));
}
else
{
return (new StaticIndexRangePartitionerForArray(array));
}
}
///
/// Creates an orderable partitioner from a instance.
///
/// Type of the elements in source enumerable.
/// The enumerable to be partitioned.
///
/// An orderable partitioner based on the input array.
///
///
/// The ordering used in the created partitioner is determined by the natural order of the elements
/// as retrieved from the source enumerable.
///
public static OrderablePartitioner Create(IEnumerable source)
{
return Create(source, -1);
}
// Internal version that allows user to specify the maxChunkSize, rather than using the default.
// Used by range partitioning methods to insure that only one range at a time is chunked.
// A maxChunkSize of -1 means "use the default".
internal static OrderablePartitioner Create(IEnumerable source, int maxChunkSize)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
// Sanity checks. If and when we make this method public, these should be converted to exceptions.
Contract.Assert(maxChunkSize != 0, "maxChunkSize specified as 0.");
Contract.Assert((maxChunkSize == -1) || (maxChunkSize < (1 << 29)), "maxChunkSize out of range");
return (new DynamicPartitionerForIEnumerable(source, maxChunkSize));
}
#if !PFX_LEGACY_3_5
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
public static OrderablePartitioner> Create(long fromInclusive, long toExclusive)
{
// How many chunks do we want to divide the range into? If this is 1, then the
// answer is "one chunk per core". Generally, though, you'll achieve better
// load balancing on a busy system if you make it higher than 1.
int coreOversubscriptionRate = 3;
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
long rangeSize = (toExclusive - fromInclusive) /
(Environment.ProcessorCount * coreOversubscriptionRate);
if (rangeSize == 0) rangeSize = 1;
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// The size of each subrange.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
/// The argument is
/// less than or equal to 0.
public static OrderablePartitioner> Create(long fromInclusive, long toExclusive, long rangeSize)
{
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
// Private method to parcel out range tuples.
private static IEnumerable> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
{
// Enumerate all of the ranges
long from, to;
bool shouldQuit = false;
for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
{
from = i;
try { checked { to = i + rangeSize; } }
catch (OverflowException)
{
to = toExclusive;
shouldQuit = true;
}
if (to > toExclusive) to = toExclusive;
yield return new Tuple(from, to);
}
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
public static OrderablePartitioner> Create(int fromInclusive, int toExclusive)
{
// How many chunks do we want to divide the range into? If this is 1, then the
// answer is "one chunk per core". Generally, though, you'll achieve better
// load balancing on a busy system if you make it higher than 1.
int coreOversubscriptionRate = 3;
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
int rangeSize = (toExclusive - fromInclusive) /
(Environment.ProcessorCount * coreOversubscriptionRate);
if (rangeSize == 0) rangeSize = 1;
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// The size of each subrange.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
/// The argument is
/// less than or equal to 0.
public static OrderablePartitioner> Create(int fromInclusive, int toExclusive, int rangeSize)
{
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
// Private method to parcel out range tuples.
private static IEnumerable> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
{
// Enumerate all of the ranges
int from, to;
bool shouldQuit = false;
for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
{
from = i;
try { checked { to = i + rangeSize; } }
catch (OverflowException)
{
to = toExclusive;
shouldQuit = true;
}
if (to > toExclusive) to = toExclusive;
yield return new Tuple(from, to);
}
}
#endif
#region DynamicPartitionEnumerator_Abstract class
///
/// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance
/// partitioning algorithm.
/// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source:
/// the key is the index in the source collection; the value is the item itself.
/// - a set of such partitions share a reader over data source. The type of the reader is specified by
/// TSourceReader.
/// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk
/// size is initially 1, and doubles every time until it reaches the maximum chunk size.
/// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange
/// types (IList and the array), one for data source of IEnumerable.
/// - The method "Reset" is not supported for any partitioning algorithm.
/// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it
/// in this abstract class.
///
/// Type of the elements in the data source
/// Type of the reader on the data source
//TSourceReader is
// - IList, when source data is IList, the shared reader is source data itself
// - TSource[], when source data is TSource[], the shared reader is source data itself
// - IEnumerator, when source data is IEnumerable, and the shared reader is an
// enumerator of the source data
private abstract class DynamicPartitionEnumerator_Abstract : IEnumerator>
{
//----------------- common fields and constructor for all dynamic partitioners -----------------
//--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
protected readonly TSourceReader m_sharedReader;
protected static int s_defaultMaxChunkSize = GetDefaultChunkSize();
//deferred allocating in MoveNext() with initial value 0, to avoid false sharing
//we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator
protected Shared m_currentChunkSize;
//deferring allocation in MoveNext() with initial value -1, to avoid false sharing
protected Shared m_localOffset;
private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs
private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
protected readonly int m_maxChunkSize; // Max chunk size specified by caller, or s_defaultMaxChunkSize
// m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
// it serves as tracking of the natual order of elements in m_sharedReader
// the value of this field is passed in from outside (already initialized) by the constructor,
protected readonly Shared m_sharedIndex;
protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex)
: this(sharedReader, sharedIndex, -1)
{
}
protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex, int maxChunkSize)
{
Contract.Assert((maxChunkSize == -1) || (maxChunkSize > 0), "maxChunkSize 0 or < -1");
m_sharedReader = sharedReader;
m_sharedIndex = sharedIndex;
if (maxChunkSize == -1) m_maxChunkSize = s_defaultMaxChunkSize;
else m_maxChunkSize = maxChunkSize;
}
// ---------------- abstract method declarations --------------
///
/// Abstract method to request a contiguous chunk of elements from the source collection
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
//GrabNextChunk does the following:
// - grab # of requestedChunkSize elements from source data through shared reader,
// - at the time of function returns, m_currentChunkSize is updated with the number of
// elements actually got ----gined (<=requestedChunkSize).
// - GrabNextChunk returns true if at least one element is assigned to this partition;
// false if the shared reader already hits the last element of the source data before
// we call GrabNextChunk
protected abstract bool GrabNextChunk(int requestedChunkSize);
///
/// Abstract property, returns whether or not the shared reader has already read the last
/// element of the source data
///
protected abstract bool HasNoElementsLeft { get; set; }
///
/// Get the current element in the current partition. Property required by IEnumerator interface
/// This property is abstract because the implementation is different depending on the type
/// of the source data: IList, Array or IEnumerable
///
public abstract KeyValuePair Current { get; }
///
/// Dispose is abstract, and depends on the type of the source data:
/// - For source data type IList and Array, the type of the shared reader is just the dataitself.
/// We don't do anything in Dispose method for IList and Array.
/// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
/// Thus we need to dispose this shared reader enumerator, when there is no more active partitions
/// left.
///
public abstract void Dispose();
///
/// Reset on partitions is not supported
///
public void Reset()
{
throw new NotSupportedException();
}
///
/// Get the current element in the current partition. Property required by IEnumerator interface
///
Object IEnumerator.Current
{
get
{
return ((DynamicPartitionEnumerator_Abstract)this).Current;
}
}
///
/// Moves to the next element if any.
/// Try current chunk first, if the current chunk do not have any elements left, then we
/// attempt to grab a chunk from the source collection.
///
///
/// true if successfully moving to the next position;
/// false otherwise, if and only if there is no more elements left in the current chunk
/// AND the source collection is exhausted.
///
public bool MoveNext()
{
//perform deferred allocating of the local variables.
if (m_localOffset == null)
{
Contract.Assert(m_currentChunkSize == null);
m_localOffset = new Shared(-1);
m_currentChunkSize = new Shared(0);
m_doublingCountdown = CHUNK_DOUBLING_RATE;
}
if (m_localOffset.Value < m_currentChunkSize.Value - 1)
//attempt to grab the next element from the local chunk
{
m_localOffset.Value++;
return true;
}
else
//otherwise it means we exhausted the local chunk
//grab a new chunk from the source enumerator
{
Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1);
//set the requested chunk size to a proper value
int requestedChunkSize;
if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator
{
requestedChunkSize = 1;
}
else if (m_doublingCountdown > 0)
{
requestedChunkSize = m_currentChunkSize.Value;
}
else
{
requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset
}
// Decrement your doubling countdown
m_doublingCountdown--;
Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize);
//GrabNextChunk will update the value of m_currentChunkSize
if (GrabNextChunk(requestedChunkSize))
{
Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
m_localOffset.Value = 0;
return true;
}
else
{
return false;
}
}
}
}
#endregion
#region Dynamic Partitioner for source data of IEnuemrable<> type
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForIEnumerator defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForIEnumerable : OrderablePartitioner
{
IEnumerable m_source;
int m_maxChunkSize; // a value of -1 means "use default"
//constructor
internal DynamicPartitionerForIEnumerable(IEnumerable source, int maxChunkSize)
: base(true, false, true)
{
m_source = source;
m_maxChunkSize = maxChunkSize;
}
///
/// Overrides OrderablePartitioner.GetOrderablePartitions.
/// Partitions the underlying collection into the given number of orderable partitions.
///
/// number of partitions requested
/// A list containing enumerators.
override public IList>> GetOrderablePartitions(int partitionCount)
{
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
IEnumerator>[] partitions
= new IEnumerator>[partitionCount];
IEnumerable> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize);
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = partitionEnumerable.GetEnumerator();
}
return partitions;
}
///
/// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
///
/// a enumerable collection of orderable partitions
override public IEnumerable> GetOrderableDynamicPartitions()
{
return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize);
}
///
/// Whether additional partitions can be created dynamically.
///
override public bool SupportsDynamicPartitions
{
get { return true; }
}
#region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator
///
/// Provides customized implementation for source data of IEnumerable
/// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
/// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a
/// shared count "m_activePartitionCount"
///
private class InternalPartitionEnumerable : IEnumerable>, IDisposable
{
//reader through which we access the source data
private readonly IEnumerator m_sharedReader;
private Shared m_sharedIndex;//initial value -1
//fields shared by all partitions that this Enumerable owns
private Shared m_hasNoElementsLeft;//deferring allocation by enumerator
//shared synchronization lock, created by this Enumerable
private object m_sharedLock;//deferring allocation by enumerator
private bool m_disposed;
private Shared m_activePartitionCount;
private readonly int m_maxChunkSize;
internal InternalPartitionEnumerable(IEnumerator sharedReader, int maxChunkSize)
{
m_sharedReader = sharedReader;
m_sharedIndex = new Shared(-1);
m_hasNoElementsLeft = new Shared(false);
m_sharedLock = new object();
m_activePartitionCount = new Shared(0);
m_maxChunkSize = maxChunkSize;
}
public IEnumerator> GetEnumerator()
{
if (m_disposed)
{
throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
}
else
{
return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex,
m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_maxChunkSize);
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((InternalPartitionEnumerable)this).GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
m_disposed = true;
m_sharedReader.Dispose();
}
}
}
///
/// Inherits from DynamicPartitionEnumerator_Abstract directly
/// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose
///
private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract>
{
//---- fields ----
//cached local copy of the current chunk
private KeyValuePair[] m_localList; //defer allocating to avoid false sharing
// the values of the following two fields are passed in from
// outside(already initialized) by the constructor,
private readonly Shared m_hasNoElementsLeft;
private readonly object m_sharedLock;
private readonly Shared m_activePartitionCount;
private InternalPartitionEnumerable m_enumerable;
//constructor
internal InternalPartitionEnumerator(
IEnumerator sharedReader,
Shared sharedIndex,
Shared hasNoElementsLeft,
object sharedLock,
Shared activePartitionCount,
InternalPartitionEnumerable enumerable,
int maxChunkSize)
: base(sharedReader, sharedIndex, maxChunkSize)
{
m_hasNoElementsLeft = hasNoElementsLeft;
m_sharedLock = sharedLock;
m_enumerable = enumerable;
m_activePartitionCount = activePartitionCount;
Interlocked.Increment(ref m_activePartitionCount.Value);
}
//overriding methods
///
/// Reserves a contiguous range of elements from source data
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
override protected bool GrabNextChunk(int requestedChunkSize)
{
Contract.Assert(requestedChunkSize > 0);
if (HasNoElementsLeft)
{
return false;
}
else
{
lock (m_sharedLock)
{
if (HasNoElementsLeft)
{
return false;
}
else
{
try
{
int actualChunkSize;
//enumerate over source data until either we got #requestedChunkSize of elements or
//MoveNext returns false
for (actualChunkSize = 0; actualChunkSize < requestedChunkSize; actualChunkSize++)
{
if (m_sharedReader.MoveNext())
{
//defer allocating to avoid false sharing
if (m_localList == null)
{
m_localList = new KeyValuePair[m_maxChunkSize];
}
Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk
m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
m_localList[actualChunkSize]
= new KeyValuePair(m_sharedIndex.Value,
m_sharedReader.Current);
}
else
{
//if MoveNext() return false, we set the flag to inform other partitions
HasNoElementsLeft = true;
break;
}
}
if (actualChunkSize > 0)
{
m_currentChunkSize.Value = actualChunkSize;
return true;
}
else
{
return false;
}
}
catch
{
// If an exception occurs, don't let the other enumerators try to enumerate.
// NOTE: this could instead throw an InvalidOperationException, but that would be unexpected
// and not helpful to the end user. We know the root cause is being communicated already.)
HasNoElementsLeft = true;
throw;
}
}
}
}
}
///
/// Returns whether or not the shared reader has already read the last
/// element of the source data
///
///
/// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
/// or not, because we can't undo MoveNext(). Thus we need to maintain a shared
/// boolean value m_hasNoElementsLeft across all partitions
///
override protected bool HasNoElementsLeft
{
get { return m_hasNoElementsLeft.Value; }
set
{
//we only set it from false to true once
//we should never set it back in any circumstances
Contract.Assert(value);
Contract.Assert(!m_hasNoElementsLeft.Value);
m_hasNoElementsLeft.Value = true;
}
}
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_currentChunkSize == null)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_localList != null);
Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
return (m_localList[m_localOffset.Value]);
}
}
///
/// If the current partition is to be disposed, we decrement the number of active partitions
/// for the shared reader.
/// If the number of active partitions becomes 0, we need to dispose the shared reader we created
///
override public void Dispose()
{
if (Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
{
m_enumerable.Dispose();
}
}
}
#endregion
}
#endregion
#region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>)
///
/// Dynamic load-balance partitioner. This class is abstract and to be derived from by
/// the customized partitioner classes for IList, Array, and IEnumerable
///
/// Type of the elements in the source data
/// Type of the source data collection
private abstract class DynamicPartitionerForIndexRange_Abstract : OrderablePartitioner
{
// TCollection can be: IList, TSource[] and IEnumerable
// Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
TCollection m_data;
///
/// Constructs a new orderable partitioner
///
/// source data collection
protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
: base(true, false, true)
{
m_data = data;
}
///
/// Partition the source data and create an enumerable over the resulting partitions.
///
/// the source data collection
/// an enumerable of partitions of
protected abstract IEnumerable> GetOrderableDynamicPartitions_Factory(TCollection data);
///
/// Overrides OrderablePartitioner.GetOrderablePartitions.
/// Partitions the underlying collection into the given number of orderable partitions.
///
/// number of partitions requested
/// A list containing enumerators.
override public IList>> GetOrderablePartitions(int partitionCount)
{
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
IEnumerator>[] partitions
= new IEnumerator>[partitionCount];
IEnumerable> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data);
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = partitionEnumerable.GetEnumerator();
}
return partitions;
}
///
/// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
///
/// a enumerable collection of orderable partitions
override public IEnumerable> GetOrderableDynamicPartitions()
{
return GetOrderableDynamicPartitions_Factory(m_data);
}
///
/// Whether additional partitions can be created dynamically.
///
override public bool SupportsDynamicPartitions
{
get { return true; }
}
}
///
/// Defines dynamic partition for source data of IList and Array.
/// This class inherits DynamicPartitionEnumerator_Abstract
/// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array
/// - Current property still remains abstract, implementation is different for IList and Array
/// - introduces another abstract method SourceCount, which returns the number of elements in
/// the source data. Implementation differs for IList and Array
///
/// Type of the elements in the data source
/// Type of the reader on the source data
private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract : DynamicPartitionEnumerator_Abstract
{
//fields
protected int m_startIndex; //initially zero
//constructor
protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, Shared sharedIndex)
: base(sharedReader, sharedIndex)
{
}
//abstract methods
//the Current property is still abstract, and will be implemented by derived classes
//we add another abstract method SourceCount to get the number of elements from the source reader
///
/// Get the number of elements from the source reader.
/// It calls IList.Count or Array.Length
///
protected abstract int SourceCount { get; }
//overriding methods
///
/// Reserves a contiguous range of elements from source data
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
override protected bool GrabNextChunk(int requestedChunkSize)
{
Contract.Assert(requestedChunkSize > 0);
while (!HasNoElementsLeft)
{
Contract.Assert(m_sharedIndex != null);
long oldSharedIndex = m_sharedIndex.Value;
if (HasNoElementsLeft)
{
//HasNoElementsLeft situation changed from false to true immediately
//and oldSharedIndex becomes stale
return false;
}
//there won't be overflow, because the index of IList/array is int, and we
//have casted it to long.
long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);
//the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex]
//inclusive in the source collection
if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex)
== oldSharedIndex)
{
//set up local indexes.
//m_currentChunkSize is always set to requestedChunkSize when source data had
//enough elements of what we requested
m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex);
m_localOffset.Value = -1;
m_startIndex = (int)(oldSharedIndex + 1);
return true;
}
}
//didn't get any element, return false;
return false;
}
///
/// Returns whether or not the shared reader has already read the last
/// element of the source data
///
override protected bool HasNoElementsLeft
{
get
{
Contract.Assert(m_sharedIndex != null);
return m_sharedIndex.Value >= SourceCount - 1;
}
set
{
Contract.Assert(false);
}
}
///
/// For source data type IList and Array, the type of the shared reader is just the data itself.
/// We don't do anything in Dispose method for IList and Array.
///
override public void Dispose()
{ }
}
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForIList defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForIList : DynamicPartitionerForIndexRange_Abstract>
{
//constructor
internal DynamicPartitionerForIList(IList source)
: base(source)
{ }
//override methods
override protected IEnumerable> GetOrderableDynamicPartitions_Factory(IList m_data)
{
//m_data itself serves as shared reader
return new InternalPartitionEnumerable(m_data);
}
///
/// Inherits from PartitionList_Abstract
/// Provides customized implementation for source data of IList
///
private class InternalPartitionEnumerable : IEnumerable>
{
//reader through which we access the source data
private readonly IList m_sharedReader;
private Shared m_sharedIndex;
internal InternalPartitionEnumerable(IList sharedReader)
{
m_sharedReader = sharedReader;
m_sharedIndex = new Shared(-1);
}
public IEnumerator> GetEnumerator()
{
return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((InternalPartitionEnumerable)this).GetEnumerator();
}
}
///
/// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
/// Provides customized implementation of SourceCount property and Current property for IList
///
private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract>
{
//constructor
internal InternalPartitionEnumerator(IList sharedReader, Shared sharedIndex)
: base(sharedReader, sharedIndex)
{ }
//overriding methods
override protected int SourceCount
{
get { return m_sharedReader.Count; }
}
///
/// return a KeyValuePair of the current element and its key
///
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_currentChunkSize == null)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
return new KeyValuePair(m_startIndex + m_localOffset.Value,
m_sharedReader[m_startIndex + m_localOffset.Value]);
}
}
}
}
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForArray defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForArray : DynamicPartitionerForIndexRange_Abstract
{
//constructor
internal DynamicPartitionerForArray(TSource[] source)
: base(source)
{ }
//override methods
override protected IEnumerable> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
{
return new InternalPartitionEnumerable(m_data);
}
///
/// Inherits from PartitionList_Abstract
/// Provides customized implementation for source data of Array
///
private class InternalPartitionEnumerable : IEnumerable>
{
//reader through which we access the source data
private readonly TSource[] m_sharedReader;
private Shared m_sharedIndex;
internal InternalPartitionEnumerable(TSource[] sharedReader)
{
m_sharedReader = sharedReader;
m_sharedIndex = new Shared(-1);
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((InternalPartitionEnumerable)this).GetEnumerator();
}
public IEnumerator> GetEnumerator()
{
return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
}
}
///
/// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
/// Provides customized implementation of SourceCount property and Current property for Array
///
private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract
{
//constructor
internal InternalPartitionEnumerator(TSource[] sharedReader, Shared sharedIndex)
: base(sharedReader, sharedIndex)
{ }
//overriding methods
override protected int SourceCount
{
get { return m_sharedReader.Length; }
}
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_currentChunkSize == null)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
return new KeyValuePair(m_startIndex + m_localOffset.Value,
m_sharedReader[m_startIndex + m_localOffset.Value]);
}
}
}
}
#endregion
#region Static partitioning for IList and Array, abstract classes
///
/// Static partitioning over IList.
/// - dynamic and load-balance
/// - Keys are ordered within each partition
/// - Keys are ordered across partitions
/// - Keys are normalized
/// - Number of partitions is fixed once specified, and the elements of the source data are
/// distributed to each partition as evenly as possible.
///
/// type of the elements
/// Type of the source data collection
private abstract class StaticIndexRangePartitioner : OrderablePartitioner
{
protected StaticIndexRangePartitioner()
: base(true, true, true)
{ }
///
/// Abstract method to return the number of elements in the source data
///
protected abstract int SourceCount { get; }
///
/// Abstract method to create a partition that covers a range over source data,
/// starting from "startIndex", ending at "endIndex"
///
/// start index of the current partition on the source data
/// end index of the current partition on the source data
/// a partition enumerator over the specified range
// The partitioning algorithm is implemented in GetOrderablePartitions method
// This method delegates according to source data type IList/Array
protected abstract IEnumerator> CreatePartition(int startIndex, int endIndex);
///
/// Overrides OrderablePartitioner.GetOrderablePartitions
/// Return a list of partitions, each of which enumerate a fixed part of the source data
/// The elements of the source data are distributed to each partition as evenly as possible.
/// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b,
/// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements,
/// and the last x-b partitions each has a elements.
/// For example, if N=10, x =3, then
/// partition 0 ranges [0,3],
/// partition 1 ranges [4,6],
/// partition 2 ranges [7,9].
/// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators.
/// An empty enumerator is indicated by
/// (m_startIndex == list.Count && m_endIndex == list.Count -1)
///
/// specified number of partitions
/// a list of partitions
override public IList>> GetOrderablePartitions(int partitionCount)
{
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
int quotient, remainder;
quotient = Math.DivRem(SourceCount, partitionCount, out remainder);
IEnumerator>[] partitions = new IEnumerator>[partitionCount];
int lastEndIndex = -1;
for (int i = 0; i < partitionCount; i++)
{
int startIndex = lastEndIndex + 1;
if (i < remainder)
{
lastEndIndex = startIndex + quotient;
}
else
{
lastEndIndex = startIndex + quotient - 1;
}
partitions[i] = CreatePartition(startIndex, lastEndIndex);
}
return partitions;
}
}
///
/// Static Partition for IList/Array.
/// This class implements all methods required by IEnumerator interface, except for the Current property.
/// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element
/// retrieval.
///
//We assume the source collection is not being updated concurrently. Otherwise it will break the
//static partitioning, since each partition operates on the source collection directly, it does
//not have a local cache of the elements assigned to them.
private abstract class StaticIndexRangePartition : IEnumerator>
{
//the start and end position in the source collection for the current partition
//the partition is empty if and only if
// (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1)
protected readonly int m_startIndex;
protected readonly int m_endIndex;
//the current index of the current partition while enumerating on the source collection
protected volatile int m_offset;
///
/// Constructs an instance of StaticIndexRangePartition
///
/// the start index in the source collection for the current partition
/// the end index in the source collection for the current partition
protected StaticIndexRangePartition(int startIndex, int endIndex)
{
m_startIndex = startIndex;
m_endIndex = endIndex;
m_offset = startIndex - 1;
}
///
/// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster
/// element retrieval.
///
public abstract KeyValuePair Current { get; }
///
/// We don't dispose the source for IList and array
///
public void Dispose()
{ }
public void Reset()
{
throw new NotSupportedException();
}
///
/// Moves to the next item
/// Before the first MoveNext is called: m_offset == m_startIndex-1;
///
/// true if successful, false if there is no item left
public bool MoveNext()
{
if (m_offset < m_endIndex)
{
m_offset++;
return true;
}
else
{
//After we have enumerated over all elements, we set m_offset to m_endIndex +1.
//The reason we do this is, for an empty enumerator, we need to tell the Current
//property whether MoveNext has been called or not.
//For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex),
//and we don't set a new value to m_offset, then the above condition will always be
//true, and the Current property will mistakenly assume MoveNext is never called.
m_offset = m_endIndex + 1;
return false;
}
}
Object IEnumerator.Current
{
get
{
return ((StaticIndexRangePartition)this).Current;
}
}
}
#endregion
#region Static partitioning for IList
///
/// Inherits from StaticIndexRangePartitioner
/// Provides customized implementation of SourceCount and CreatePartition
///
///
private class StaticIndexRangePartitionerForIList : StaticIndexRangePartitioner>
{
IList m_list;
internal StaticIndexRangePartitionerForIList(IList list)
: base()
{
Contract.Assert(list != null);
m_list = list;
}
override protected int SourceCount
{
get { return m_list.Count; }
}
override protected IEnumerator> CreatePartition(int startIndex, int endIndex)
{
return new StaticIndexRangePartitionForIList(m_list, startIndex, endIndex);
}
}
///
/// Inherits from StaticIndexRangePartition
/// Provides customized implementation of Current property
///
///
private class StaticIndexRangePartitionForIList : StaticIndexRangePartition
{
//the source collection shared by all partitions
private volatile IList m_list;
internal StaticIndexRangePartitionForIList(IList list, int startIndex, int endIndex)
: base(startIndex, endIndex)
{
Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1);
m_list = list;
}
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_offset < m_startIndex)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
return (new KeyValuePair(m_offset, m_list[m_offset]));
}
}
}
#endregion
#region static partitioning for Arrays
///
/// Inherits from StaticIndexRangePartitioner
/// Provides customized implementation of SourceCount and CreatePartition for Array
///
private class StaticIndexRangePartitionerForArray : StaticIndexRangePartitioner
{
TSource[] m_array;
internal StaticIndexRangePartitionerForArray(TSource[] array)
: base()
{
Contract.Assert(array != null);
m_array = array;
}
override protected int SourceCount
{
get { return m_array.Length; }
}
override protected IEnumerator> CreatePartition(int startIndex, int endIndex)
{
return new StaticIndexRangePartitionForArray(m_array, startIndex, endIndex);
}
}
///
/// Inherits from StaticIndexRangePartitioner
/// Provides customized implementation of SourceCount and CreatePartition
///
private class StaticIndexRangePartitionForArray : StaticIndexRangePartition
{
//the source collection shared by all partitions
private volatile TSource[] m_array;
internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex)
: base(startIndex, endIndex)
{
Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1);
m_array = array;
}
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_offset < m_startIndex)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex);
return (new KeyValuePair(m_offset, m_array[m_offset]));
}
}
}
#endregion
#region Utility functions
///
/// A very simple primitive that allows us to share a value across multiple threads.
///
///
private class Shared
{
internal TSource Value;
internal Shared(TSource value)
{
this.Value = value;
}
}
//--------------------
// The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling,
// because mscorlib.dll cannot access System.Linq.Parallel.Scheduling
//--------------------
// The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache
// lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines,
// but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient
// for 128b cache systems. So 128b it is.
private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4;
private static int GetDefaultChunkSize()
{
int chunkSize;
if (typeof(TSource).IsValueType)
{
// @
if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit)
{
chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource)));
}
else
{
// We choose '128' because this ensures, no matter the actual size of the value type,
// the total bytes used will be a multiple of 128. This ensures it's cache aligned.
chunkSize = 128;
}
}
else
{
Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size");
chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size);
}
return chunkSize;
}
#endregion
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
#pragma warning disable 0420
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// PartitionerStatic.cs
//
// [....]
//
// A class of default partitioners for Partitioner
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Security.Permissions;
using System.Threading;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
namespace System.Collections.Concurrent
{
// The static class Partitioners implements 3 default partitioning strategies:
// 1. dynamic load balance partitioning for indexable data source (IList and arrays)
// 2. static partitioning for indexable data source (IList and arrays)
// 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order
// of elements, but enuemrators are not indexable
// - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3.
// We assume that the source data of IList/Array is not changing concurrently.
// - data source of type IEnumerable can only be partitioned dynamically (load-balance)
// - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the
// implementation is different for data source of IList/Array vs. IEnumerable:
// * When the source collection is IList/Arrays, we use Interlocked on the shared index;
// * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source
// enumerator.
///
/// Provides common partitioning strategies for arrays, lists, and enumerables.
///
///
///
/// The static methods on are all thread-safe and may be used concurrently
/// from multiple threads. However, while a created partitioner is in use, the underlying data source
/// should not be modified, whether from the same thread that's using a partitioner or from a separate
/// thread.
///
///
[HostProtection(Synchronization = true, ExternalThreading = true)]
public static class Partitioner
{
///
/// Creates an orderable partitioner from an
/// instance.
///
/// Type of the elements in source list.
/// The list to be partitioned.
///
/// A Boolean value that indicates whether the created partitioner should dynamically
/// load balance between partitions rather than statically partition.
///
///
/// An orderable partitioner based on the input list.
///
public static OrderablePartitioner Create(IList list, bool loadBalance)
{
if (list == null)
{
throw new ArgumentNullException("list");
}
if (loadBalance)
{
return (new DynamicPartitionerForIList(list));
}
else
{
return (new StaticIndexRangePartitionerForIList(list));
}
}
///
/// Creates an orderable partitioner from a instance.
///
/// Type of the elements in source array.
/// The array to be partitioned.
///
/// A Boolean value that indicates whether the created partitioner should dynamically load balance
/// between partitions rather than statically partition.
///
///
/// An orderable partitioner based on the input array.
///
public static OrderablePartitioner Create(TSource[] array, bool loadBalance)
{
// This implementation uses 'ldelem' instructions for element retrieval, rather than using a
// method call.
if (array == null)
{
throw new ArgumentNullException("array");
}
if (loadBalance)
{
return (new DynamicPartitionerForArray(array));
}
else
{
return (new StaticIndexRangePartitionerForArray(array));
}
}
///
/// Creates an orderable partitioner from a instance.
///
/// Type of the elements in source enumerable.
/// The enumerable to be partitioned.
///
/// An orderable partitioner based on the input array.
///
///
/// The ordering used in the created partitioner is determined by the natural order of the elements
/// as retrieved from the source enumerable.
///
public static OrderablePartitioner Create(IEnumerable source)
{
return Create(source, -1);
}
// Internal version that allows user to specify the maxChunkSize, rather than using the default.
// Used by range partitioning methods to insure that only one range at a time is chunked.
// A maxChunkSize of -1 means "use the default".
internal static OrderablePartitioner Create(IEnumerable source, int maxChunkSize)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
// Sanity checks. If and when we make this method public, these should be converted to exceptions.
Contract.Assert(maxChunkSize != 0, "maxChunkSize specified as 0.");
Contract.Assert((maxChunkSize == -1) || (maxChunkSize < (1 << 29)), "maxChunkSize out of range");
return (new DynamicPartitionerForIEnumerable(source, maxChunkSize));
}
#if !PFX_LEGACY_3_5
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
public static OrderablePartitioner> Create(long fromInclusive, long toExclusive)
{
// How many chunks do we want to divide the range into? If this is 1, then the
// answer is "one chunk per core". Generally, though, you'll achieve better
// load balancing on a busy system if you make it higher than 1.
int coreOversubscriptionRate = 3;
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
long rangeSize = (toExclusive - fromInclusive) /
(Environment.ProcessorCount * coreOversubscriptionRate);
if (rangeSize == 0) rangeSize = 1;
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// The size of each subrange.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
/// The argument is
/// less than or equal to 0.
public static OrderablePartitioner> Create(long fromInclusive, long toExclusive, long rangeSize)
{
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
// Private method to parcel out range tuples.
private static IEnumerable> CreateRanges(long fromInclusive, long toExclusive, long rangeSize)
{
// Enumerate all of the ranges
long from, to;
bool shouldQuit = false;
for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
{
from = i;
try { checked { to = i + rangeSize; } }
catch (OverflowException)
{
to = toExclusive;
shouldQuit = true;
}
if (to > toExclusive) to = toExclusive;
yield return new Tuple(from, to);
}
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
public static OrderablePartitioner> Create(int fromInclusive, int toExclusive)
{
// How many chunks do we want to divide the range into? If this is 1, then the
// answer is "one chunk per core". Generally, though, you'll achieve better
// load balancing on a busy system if you make it higher than 1.
int coreOversubscriptionRate = 3;
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
int rangeSize = (toExclusive - fromInclusive) /
(Environment.ProcessorCount * coreOversubscriptionRate);
if (rangeSize == 0) rangeSize = 1;
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
/// Creates a partitioner that chunks the user-specified range.
/// The lower, inclusive bound of the range.
/// The upper, exclusive bound of the range.
/// The size of each subrange.
/// A partitioner.
/// The argument is
/// less than or equal to the argument.
/// The argument is
/// less than or equal to 0.
public static OrderablePartitioner> Create(int fromInclusive, int toExclusive, int rangeSize)
{
if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive");
if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize");
return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time
}
// Private method to parcel out range tuples.
private static IEnumerable> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
{
// Enumerate all of the ranges
int from, to;
bool shouldQuit = false;
for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize)
{
from = i;
try { checked { to = i + rangeSize; } }
catch (OverflowException)
{
to = toExclusive;
shouldQuit = true;
}
if (to > toExclusive) to = toExclusive;
yield return new Tuple(from, to);
}
}
#endif
#region DynamicPartitionEnumerator_Abstract class
///
/// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance
/// partitioning algorithm.
/// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source:
/// the key is the index in the source collection; the value is the item itself.
/// - a set of such partitions share a reader over data source. The type of the reader is specified by
/// TSourceReader.
/// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk
/// size is initially 1, and doubles every time until it reaches the maximum chunk size.
/// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange
/// types (IList and the array), one for data source of IEnumerable.
/// - The method "Reset" is not supported for any partitioning algorithm.
/// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it
/// in this abstract class.
///
/// Type of the elements in the data source
/// Type of the reader on the data source
//TSourceReader is
// - IList, when source data is IList, the shared reader is source data itself
// - TSource[], when source data is TSource[], the shared reader is source data itself
// - IEnumerator, when source data is IEnumerable, and the shared reader is an
// enumerator of the source data
private abstract class DynamicPartitionEnumerator_Abstract : IEnumerator>
{
//----------------- common fields and constructor for all dynamic partitioners -----------------
//--- shared by all dervied class with souce data type: IList, Array, and IEnumerator
protected readonly TSourceReader m_sharedReader;
protected static int s_defaultMaxChunkSize = GetDefaultChunkSize();
//deferred allocating in MoveNext() with initial value 0, to avoid false sharing
//we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator
protected Shared m_currentChunkSize;
//deferring allocation in MoveNext() with initial value -1, to avoid false sharing
protected Shared m_localOffset;
private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs
private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles
protected readonly int m_maxChunkSize; // Max chunk size specified by caller, or s_defaultMaxChunkSize
// m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable
// it serves as tracking of the natual order of elements in m_sharedReader
// the value of this field is passed in from outside (already initialized) by the constructor,
protected readonly Shared m_sharedIndex;
protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex)
: this(sharedReader, sharedIndex, -1)
{
}
protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex, int maxChunkSize)
{
Contract.Assert((maxChunkSize == -1) || (maxChunkSize > 0), "maxChunkSize 0 or < -1");
m_sharedReader = sharedReader;
m_sharedIndex = sharedIndex;
if (maxChunkSize == -1) m_maxChunkSize = s_defaultMaxChunkSize;
else m_maxChunkSize = maxChunkSize;
}
// ---------------- abstract method declarations --------------
///
/// Abstract method to request a contiguous chunk of elements from the source collection
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
//GrabNextChunk does the following:
// - grab # of requestedChunkSize elements from source data through shared reader,
// - at the time of function returns, m_currentChunkSize is updated with the number of
// elements actually got ----gined (<=requestedChunkSize).
// - GrabNextChunk returns true if at least one element is assigned to this partition;
// false if the shared reader already hits the last element of the source data before
// we call GrabNextChunk
protected abstract bool GrabNextChunk(int requestedChunkSize);
///
/// Abstract property, returns whether or not the shared reader has already read the last
/// element of the source data
///
protected abstract bool HasNoElementsLeft { get; set; }
///
/// Get the current element in the current partition. Property required by IEnumerator interface
/// This property is abstract because the implementation is different depending on the type
/// of the source data: IList, Array or IEnumerable
///
public abstract KeyValuePair Current { get; }
///
/// Dispose is abstract, and depends on the type of the source data:
/// - For source data type IList and Array, the type of the shared reader is just the dataitself.
/// We don't do anything in Dispose method for IList and Array.
/// - For source data type IEnumerable, the type of the shared reader is an enumerator we created.
/// Thus we need to dispose this shared reader enumerator, when there is no more active partitions
/// left.
///
public abstract void Dispose();
///
/// Reset on partitions is not supported
///
public void Reset()
{
throw new NotSupportedException();
}
///
/// Get the current element in the current partition. Property required by IEnumerator interface
///
Object IEnumerator.Current
{
get
{
return ((DynamicPartitionEnumerator_Abstract)this).Current;
}
}
///
/// Moves to the next element if any.
/// Try current chunk first, if the current chunk do not have any elements left, then we
/// attempt to grab a chunk from the source collection.
///
///
/// true if successfully moving to the next position;
/// false otherwise, if and only if there is no more elements left in the current chunk
/// AND the source collection is exhausted.
///
public bool MoveNext()
{
//perform deferred allocating of the local variables.
if (m_localOffset == null)
{
Contract.Assert(m_currentChunkSize == null);
m_localOffset = new Shared(-1);
m_currentChunkSize = new Shared(0);
m_doublingCountdown = CHUNK_DOUBLING_RATE;
}
if (m_localOffset.Value < m_currentChunkSize.Value - 1)
//attempt to grab the next element from the local chunk
{
m_localOffset.Value++;
return true;
}
else
//otherwise it means we exhausted the local chunk
//grab a new chunk from the source enumerator
{
Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1);
//set the requested chunk size to a proper value
int requestedChunkSize;
if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator
{
requestedChunkSize = 1;
}
else if (m_doublingCountdown > 0)
{
requestedChunkSize = m_currentChunkSize.Value;
}
else
{
requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset
}
// Decrement your doubling countdown
m_doublingCountdown--;
Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize);
//GrabNextChunk will update the value of m_currentChunkSize
if (GrabNextChunk(requestedChunkSize))
{
Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0);
m_localOffset.Value = 0;
return true;
}
else
{
return false;
}
}
}
}
#endregion
#region Dynamic Partitioner for source data of IEnuemrable<> type
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForIEnumerator defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForIEnumerable : OrderablePartitioner
{
IEnumerable m_source;
int m_maxChunkSize; // a value of -1 means "use default"
//constructor
internal DynamicPartitionerForIEnumerable(IEnumerable source, int maxChunkSize)
: base(true, false, true)
{
m_source = source;
m_maxChunkSize = maxChunkSize;
}
///
/// Overrides OrderablePartitioner.GetOrderablePartitions.
/// Partitions the underlying collection into the given number of orderable partitions.
///
/// number of partitions requested
/// A list containing enumerators.
override public IList>> GetOrderablePartitions(int partitionCount)
{
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
IEnumerator>[] partitions
= new IEnumerator>[partitionCount];
IEnumerable> partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize);
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = partitionEnumerable.GetEnumerator();
}
return partitions;
}
///
/// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
///
/// a enumerable collection of orderable partitions
override public IEnumerable> GetOrderableDynamicPartitions()
{
return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize);
}
///
/// Whether additional partitions can be created dynamically.
///
override public bool SupportsDynamicPartitions
{
get { return true; }
}
#region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator
///
/// Provides customized implementation for source data of IEnumerable
/// Different from the counterpart for IList/Array, this enumerable maintains several additional fields
/// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a
/// shared count "m_activePartitionCount"
///
private class InternalPartitionEnumerable : IEnumerable>, IDisposable
{
//reader through which we access the source data
private readonly IEnumerator m_sharedReader;
private Shared m_sharedIndex;//initial value -1
//fields shared by all partitions that this Enumerable owns
private Shared m_hasNoElementsLeft;//deferring allocation by enumerator
//shared synchronization lock, created by this Enumerable
private object m_sharedLock;//deferring allocation by enumerator
private bool m_disposed;
private Shared m_activePartitionCount;
private readonly int m_maxChunkSize;
internal InternalPartitionEnumerable(IEnumerator sharedReader, int maxChunkSize)
{
m_sharedReader = sharedReader;
m_sharedIndex = new Shared(-1);
m_hasNoElementsLeft = new Shared(false);
m_sharedLock = new object();
m_activePartitionCount = new Shared(0);
m_maxChunkSize = maxChunkSize;
}
public IEnumerator> GetEnumerator()
{
if (m_disposed)
{
throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
}
else
{
return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex,
m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_maxChunkSize);
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((InternalPartitionEnumerable)this).GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
m_disposed = true;
m_sharedReader.Dispose();
}
}
}
///
/// Inherits from DynamicPartitionEnumerator_Abstract directly
/// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose
///
private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract>
{
//---- fields ----
//cached local copy of the current chunk
private KeyValuePair[] m_localList; //defer allocating to avoid false sharing
// the values of the following two fields are passed in from
// outside(already initialized) by the constructor,
private readonly Shared m_hasNoElementsLeft;
private readonly object m_sharedLock;
private readonly Shared m_activePartitionCount;
private InternalPartitionEnumerable m_enumerable;
//constructor
internal InternalPartitionEnumerator(
IEnumerator sharedReader,
Shared sharedIndex,
Shared hasNoElementsLeft,
object sharedLock,
Shared activePartitionCount,
InternalPartitionEnumerable enumerable,
int maxChunkSize)
: base(sharedReader, sharedIndex, maxChunkSize)
{
m_hasNoElementsLeft = hasNoElementsLeft;
m_sharedLock = sharedLock;
m_enumerable = enumerable;
m_activePartitionCount = activePartitionCount;
Interlocked.Increment(ref m_activePartitionCount.Value);
}
//overriding methods
///
/// Reserves a contiguous range of elements from source data
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
override protected bool GrabNextChunk(int requestedChunkSize)
{
Contract.Assert(requestedChunkSize > 0);
if (HasNoElementsLeft)
{
return false;
}
else
{
lock (m_sharedLock)
{
if (HasNoElementsLeft)
{
return false;
}
else
{
try
{
int actualChunkSize;
//enumerate over source data until either we got #requestedChunkSize of elements or
//MoveNext returns false
for (actualChunkSize = 0; actualChunkSize < requestedChunkSize; actualChunkSize++)
{
if (m_sharedReader.MoveNext())
{
//defer allocating to avoid false sharing
if (m_localList == null)
{
m_localList = new KeyValuePair[m_maxChunkSize];
}
Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk
m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
m_localList[actualChunkSize]
= new KeyValuePair(m_sharedIndex.Value,
m_sharedReader.Current);
}
else
{
//if MoveNext() return false, we set the flag to inform other partitions
HasNoElementsLeft = true;
break;
}
}
if (actualChunkSize > 0)
{
m_currentChunkSize.Value = actualChunkSize;
return true;
}
else
{
return false;
}
}
catch
{
// If an exception occurs, don't let the other enumerators try to enumerate.
// NOTE: this could instead throw an InvalidOperationException, but that would be unexpected
// and not helpful to the end user. We know the root cause is being communicated already.)
HasNoElementsLeft = true;
throw;
}
}
}
}
}
///
/// Returns whether or not the shared reader has already read the last
/// element of the source data
///
///
/// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element
/// or not, because we can't undo MoveNext(). Thus we need to maintain a shared
/// boolean value m_hasNoElementsLeft across all partitions
///
override protected bool HasNoElementsLeft
{
get { return m_hasNoElementsLeft.Value; }
set
{
//we only set it from false to true once
//we should never set it back in any circumstances
Contract.Assert(value);
Contract.Assert(!m_hasNoElementsLeft.Value);
m_hasNoElementsLeft.Value = true;
}
}
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_currentChunkSize == null)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_localList != null);
Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
return (m_localList[m_localOffset.Value]);
}
}
///
/// If the current partition is to be disposed, we decrement the number of active partitions
/// for the shared reader.
/// If the number of active partitions becomes 0, we need to dispose the shared reader we created
///
override public void Dispose()
{
if (Interlocked.Decrement(ref m_activePartitionCount.Value) == 0)
{
m_enumerable.Dispose();
}
}
}
#endregion
}
#endregion
#region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>)
///
/// Dynamic load-balance partitioner. This class is abstract and to be derived from by
/// the customized partitioner classes for IList, Array, and IEnumerable
///
/// Type of the elements in the source data
/// Type of the source data collection
private abstract class DynamicPartitionerForIndexRange_Abstract : OrderablePartitioner
{
// TCollection can be: IList, TSource[] and IEnumerable
// Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly
TCollection m_data;
///
/// Constructs a new orderable partitioner
///
/// source data collection
protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
: base(true, false, true)
{
m_data = data;
}
///
/// Partition the source data and create an enumerable over the resulting partitions.
///
/// the source data collection
/// an enumerable of partitions of
protected abstract IEnumerable> GetOrderableDynamicPartitions_Factory(TCollection data);
///
/// Overrides OrderablePartitioner.GetOrderablePartitions.
/// Partitions the underlying collection into the given number of orderable partitions.
///
/// number of partitions requested
/// A list containing enumerators.
override public IList>> GetOrderablePartitions(int partitionCount)
{
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
IEnumerator>[] partitions
= new IEnumerator>[partitionCount];
IEnumerable> partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data);
for (int i = 0; i < partitionCount; i++)
{
partitions[i] = partitionEnumerable.GetEnumerator();
}
return partitions;
}
///
/// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions
///
/// a enumerable collection of orderable partitions
override public IEnumerable> GetOrderableDynamicPartitions()
{
return GetOrderableDynamicPartitions_Factory(m_data);
}
///
/// Whether additional partitions can be created dynamically.
///
override public bool SupportsDynamicPartitions
{
get { return true; }
}
}
///
/// Defines dynamic partition for source data of IList and Array.
/// This class inherits DynamicPartitionEnumerator_Abstract
/// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array
/// - Current property still remains abstract, implementation is different for IList and Array
/// - introduces another abstract method SourceCount, which returns the number of elements in
/// the source data. Implementation differs for IList and Array
///
/// Type of the elements in the data source
/// Type of the reader on the source data
private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract : DynamicPartitionEnumerator_Abstract
{
//fields
protected int m_startIndex; //initially zero
//constructor
protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, Shared sharedIndex)
: base(sharedReader, sharedIndex)
{
}
//abstract methods
//the Current property is still abstract, and will be implemented by derived classes
//we add another abstract method SourceCount to get the number of elements from the source reader
///
/// Get the number of elements from the source reader.
/// It calls IList.Count or Array.Length
///
protected abstract int SourceCount { get; }
//overriding methods
///
/// Reserves a contiguous range of elements from source data
///
/// specified number of elements requested
///
/// true if we successfully reserved at least one element (up to #=requestedChunkSize)
/// false if all elements in the source collection have been reserved.
///
override protected bool GrabNextChunk(int requestedChunkSize)
{
Contract.Assert(requestedChunkSize > 0);
while (!HasNoElementsLeft)
{
Contract.Assert(m_sharedIndex != null);
long oldSharedIndex = m_sharedIndex.Value;
if (HasNoElementsLeft)
{
//HasNoElementsLeft situation changed from false to true immediately
//and oldSharedIndex becomes stale
return false;
}
//there won't be overflow, because the index of IList/array is int, and we
//have casted it to long.
long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize);
//the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex]
//inclusive in the source collection
if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex)
== oldSharedIndex)
{
//set up local indexes.
//m_currentChunkSize is always set to requestedChunkSize when source data had
//enough elements of what we requested
m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex);
m_localOffset.Value = -1;
m_startIndex = (int)(oldSharedIndex + 1);
return true;
}
}
//didn't get any element, return false;
return false;
}
///
/// Returns whether or not the shared reader has already read the last
/// element of the source data
///
override protected bool HasNoElementsLeft
{
get
{
Contract.Assert(m_sharedIndex != null);
return m_sharedIndex.Value >= SourceCount - 1;
}
set
{
Contract.Assert(false);
}
}
///
/// For source data type IList and Array, the type of the shared reader is just the data itself.
/// We don't do anything in Dispose method for IList and Array.
///
override public void Dispose()
{ }
}
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForIList defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForIList : DynamicPartitionerForIndexRange_Abstract>
{
//constructor
internal DynamicPartitionerForIList(IList source)
: base(source)
{ }
//override methods
override protected IEnumerable> GetOrderableDynamicPartitions_Factory(IList m_data)
{
//m_data itself serves as shared reader
return new InternalPartitionEnumerable(m_data);
}
///
/// Inherits from PartitionList_Abstract
/// Provides customized implementation for source data of IList
///
private class InternalPartitionEnumerable : IEnumerable>
{
//reader through which we access the source data
private readonly IList m_sharedReader;
private Shared m_sharedIndex;
internal InternalPartitionEnumerable(IList sharedReader)
{
m_sharedReader = sharedReader;
m_sharedIndex = new Shared(-1);
}
public IEnumerator> GetEnumerator()
{
return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
}
IEnumerator IEnumerable.GetEnumerator()
{
return ((InternalPartitionEnumerable)this).GetEnumerator();
}
}
///
/// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract
/// Provides customized implementation of SourceCount property and Current property for IList
///
private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract>
{
//constructor
internal InternalPartitionEnumerator(IList sharedReader, Shared sharedIndex)
: base(sharedReader, sharedIndex)
{ }
//overriding methods
override protected int SourceCount
{
get { return m_sharedReader.Count; }
}
///
/// return a KeyValuePair of the current element and its key
///
override public KeyValuePair Current
{
get
{
//verify that MoveNext is at least called once before Current is called
if (m_currentChunkSize == null)
{
throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext"));
}
Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value);
return new KeyValuePair(m_startIndex + m_localOffset.Value,
m_sharedReader[m_startIndex + m_localOffset.Value]);
}
}
}
}
///
/// Inherits from DynamicPartitioners
/// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance
/// of EnumerableOfPartitionsForArray defined internally
///
/// Type of elements in the source data
private class DynamicPartitionerForArray : DynamicPartitionerForIndexRange_Abstract
{
//constructor
internal DynamicPartitionerForArray(TSource[] source)
: base(source)
{ }
//override methods
override protected IEnumerable> GetOrderableDynamicPartitions_Factory(TSource[] m_data)
{
return new InternalPartitionEnumerable(m_data);
}
///
/// Inherits from PartitionList_Abstract
/// Provides customized implementation for source data of Array
///
private class InternalPartitionEnumerable : IEnumerable>
{
//reader through which we access the source data
private readonly TSource[] m_sharedReader;
private Shared