Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / MergeExecutor.cs / 1305376 / MergeExecutor.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// MergeExecutor.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// Drives execution of an actual merge operation, including creating channel data
/// structures and scheduling parallel work as appropriate. The algorithms used
/// internally are parameterized based on the type of data in the partitions; e.g.
/// if an order preserved stream is found, the merge will automatically use an
/// order preserving merge, and so forth.
///
///
internal class MergeExecutor : IEnumerable
{
// Many internal algorithms are parameterized based on the data. The IMergeHelper
// is the pluggable interface whose implementations perform those algorithms.
private IMergeHelper m_mergeHelper;
// Private constructor. MergeExecutor should only be constructed via the
// MergeExecutor.Execute static method.
private MergeExecutor()
{
}
//------------------------------------------------------------------------------------
// Creates and executes a new merge executor object.
//
// Arguments:
// partitions - the partitions whose data will be merged into one stream
// ignoreOutput - if true, we are enumerating "for effect", and we won't actually
// generate data in the output stream
// pipeline - whether to use a pipelined merge or not.
// isOrdered - whether to perform an ordering merge.
//
internal static MergeExecutor Execute(
PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered,
CancellationState cancellationState, int queryId)
{
Contract.Assert(partitions != null);
Contract.Assert(partitions.PartitionCount > 0);
Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?");
MergeExecutor mergeExecutor = new MergeExecutor();
if (isOrdered && !ignoreOutput)
{
if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
{
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered);
if (partitions.PartitionCount > 1)
{
// We use a pipelining ordered merge
mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper(
(PartitionedStream)(object)partitions, taskScheduler, cancellationState, autoBuffered, queryId);
}
else
{
// When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering
// does not add any extra overhead, and no producer task needs to be scheduled.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper(
partitions, false, options, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a stop-and-go ordered merge helper
mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper(partitions, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a default - unordered - merge helper.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper(partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId);
}
mergeExecutor.Execute();
return mergeExecutor;
}
//-----------------------------------------------------------------------------------
// Initiates execution of the merge.
//
private void Execute()
{
Contract.Assert(m_mergeHelper != null);
m_mergeHelper.Execute();
}
//-----------------------------------------------------------------------------------
// Returns an enumerator that will yield elements from the resulting merged data
// stream.
//
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)this).GetEnumerator();
}
public IEnumerator GetEnumerator()
{
Contract.Assert(m_mergeHelper != null);
return m_mergeHelper.GetEnumerator();
}
//-----------------------------------------------------------------------------------
// Returns the merged results as an array.
//
internal TInputOutput[] GetResultsAsArray()
{
return m_mergeHelper.GetResultsAsArray();
}
//------------------------------------------------------------------------------------
// This internal helper method is used to generate a set of asynchronous channels.
// The algorithm used by each channel contains the necessary synchronizationis to
// ensure it is suitable for pipelined consumption.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of asynchronous channels, one for each partition.
//
internal static AsynchronousChannel[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, CancellationToken cancellationToken)
{
AsynchronousChannel[] channels = new AsynchronousChannel[partitionCount];
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount);
// If we are pipelining, we need a channel that contains the necessary synchronization
// in it. We choose a bounded/blocking channel data structure: bounded so that we can
// limit the amount of memory overhead used by the query by putting a cap on the
// buffer size into which producers place data, and blocking so that the consumer can
// wait for additional data to arrive in the case that it's found to be empty.
int chunkSize = 0; // 0 means automatic chunk size
if (options == ParallelMergeOptions.NotBuffered)
{
chunkSize = 1;
}
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new AsynchronousChannel(chunkSize, cancellationToken);
}
return channels;
}
//-----------------------------------------------------------------------------------
// This internal helper method is used to generate a set of synchronous channels.
// The channel data structure used has been optimized for sequential execution and
// does not support pipelining.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of synchronous channels, one for each partition.
//
internal static SynchronousChannel[] MakeSynchronousChannels(int partitionCount)
{
SynchronousChannel[] channels = new SynchronousChannel[partitionCount];
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount);
// We just build up the results in memory using simple, dynamically growable FIFO queues.
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new SynchronousChannel();
}
return channels;
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// MergeExecutor.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// Drives execution of an actual merge operation, including creating channel data
/// structures and scheduling parallel work as appropriate. The algorithms used
/// internally are parameterized based on the type of data in the partitions; e.g.
/// if an order preserved stream is found, the merge will automatically use an
/// order preserving merge, and so forth.
///
///
internal class MergeExecutor : IEnumerable
{
// Many internal algorithms are parameterized based on the data. The IMergeHelper
// is the pluggable interface whose implementations perform those algorithms.
private IMergeHelper m_mergeHelper;
// Private constructor. MergeExecutor should only be constructed via the
// MergeExecutor.Execute static method.
private MergeExecutor()
{
}
//------------------------------------------------------------------------------------
// Creates and executes a new merge executor object.
//
// Arguments:
// partitions - the partitions whose data will be merged into one stream
// ignoreOutput - if true, we are enumerating "for effect", and we won't actually
// generate data in the output stream
// pipeline - whether to use a pipelined merge or not.
// isOrdered - whether to perform an ordering merge.
//
internal static MergeExecutor Execute(
PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered,
CancellationState cancellationState, int queryId)
{
Contract.Assert(partitions != null);
Contract.Assert(partitions.PartitionCount > 0);
Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?");
MergeExecutor mergeExecutor = new MergeExecutor();
if (isOrdered && !ignoreOutput)
{
if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing))
{
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered);
if (partitions.PartitionCount > 1)
{
// We use a pipelining ordered merge
mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper(
(PartitionedStream)(object)partitions, taskScheduler, cancellationState, autoBuffered, queryId);
}
else
{
// When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering
// does not add any extra overhead, and no producer task needs to be scheduled.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper(
partitions, false, options, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a stop-and-go ordered merge helper
mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper(partitions, taskScheduler, cancellationState, queryId);
}
}
else
{
// We use a default - unordered - merge helper.
mergeExecutor.m_mergeHelper = new DefaultMergeHelper(partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId);
}
mergeExecutor.Execute();
return mergeExecutor;
}
//-----------------------------------------------------------------------------------
// Initiates execution of the merge.
//
private void Execute()
{
Contract.Assert(m_mergeHelper != null);
m_mergeHelper.Execute();
}
//-----------------------------------------------------------------------------------
// Returns an enumerator that will yield elements from the resulting merged data
// stream.
//
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)this).GetEnumerator();
}
public IEnumerator GetEnumerator()
{
Contract.Assert(m_mergeHelper != null);
return m_mergeHelper.GetEnumerator();
}
//-----------------------------------------------------------------------------------
// Returns the merged results as an array.
//
internal TInputOutput[] GetResultsAsArray()
{
return m_mergeHelper.GetResultsAsArray();
}
//------------------------------------------------------------------------------------
// This internal helper method is used to generate a set of asynchronous channels.
// The algorithm used by each channel contains the necessary synchronizationis to
// ensure it is suitable for pipelined consumption.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of asynchronous channels, one for each partition.
//
internal static AsynchronousChannel[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, CancellationToken cancellationToken)
{
AsynchronousChannel[] channels = new AsynchronousChannel[partitionCount];
Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered);
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount);
// If we are pipelining, we need a channel that contains the necessary synchronization
// in it. We choose a bounded/blocking channel data structure: bounded so that we can
// limit the amount of memory overhead used by the query by putting a cap on the
// buffer size into which producers place data, and blocking so that the consumer can
// wait for additional data to arrive in the case that it's found to be empty.
int chunkSize = 0; // 0 means automatic chunk size
if (options == ParallelMergeOptions.NotBuffered)
{
chunkSize = 1;
}
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new AsynchronousChannel(chunkSize, cancellationToken);
}
return channels;
}
//-----------------------------------------------------------------------------------
// This internal helper method is used to generate a set of synchronous channels.
// The channel data structure used has been optimized for sequential execution and
// does not support pipelining.
//
// Arguments:
// partitionsCount - the number of partitions for which to create new channels.
//
// Return Value:
// An array of synchronous channels, one for each partition.
//
internal static SynchronousChannel[] MakeSynchronousChannels(int partitionCount)
{
SynchronousChannel[] channels = new SynchronousChannel[partitionCount];
TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount);
// We just build up the results in memory using simple, dynamically growable FIFO queues.
for (int i = 0; i < channels.Length; i++)
{
channels[i] = new SynchronousChannel();
}
return channels;
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ExitEventArgs.cs
- DelayedRegex.cs
- ToolStripContentPanel.cs
- EntityDataSourceMemberPath.cs
- SymbolResolver.cs
- AsmxEndpointPickerExtension.cs
- QuaternionAnimationBase.cs
- ReadOnlyMetadataCollection.cs
- WebEvents.cs
- mda.cs
- XmlByteStreamWriter.cs
- SoapAttributeAttribute.cs
- FormView.cs
- CursorInteropHelper.cs
- SystemIPv6InterfaceProperties.cs
- MulticastIPAddressInformationCollection.cs
- CommentAction.cs
- FormViewInsertedEventArgs.cs
- dbenumerator.cs
- StateInitialization.cs
- XPathException.cs
- ReadOnlyKeyedCollection.cs
- WebPartCatalogAddVerb.cs
- Semaphore.cs
- DecimalAnimationUsingKeyFrames.cs
- SecurityDescriptor.cs
- UInt64.cs
- WindowsScrollBar.cs
- HashHelper.cs
- ObjectDataSourceMethodEventArgs.cs
- SymLanguageType.cs
- TimeoutHelper.cs
- ReverseInheritProperty.cs
- XmlReaderSettings.cs
- SByte.cs
- BitHelper.cs
- CodeIdentifier.cs
- X509ScopedServiceCertificateElement.cs
- HttpModulesSection.cs
- SqlTriggerAttribute.cs
- AspNetRouteServiceHttpHandler.cs
- UseManagedPresentationBindingElement.cs
- MulticastNotSupportedException.cs
- precedingquery.cs
- Zone.cs
- ClientOptions.cs
- DeferredTextReference.cs
- HtmlInputControl.cs
- DbConnectionPoolOptions.cs
- _ListenerResponseStream.cs
- EventLogPermissionAttribute.cs
- EntityStoreSchemaGenerator.cs
- ErrorLog.cs
- TreeNodeBindingCollection.cs
- StubHelpers.cs
- BaseConfigurationRecord.cs
- WebConfigurationFileMap.cs
- UniqueIdentifierService.cs
- NetworkStream.cs
- RemoteWebConfigurationHost.cs
- WebPartDisplayModeCollection.cs
- TypeConverterValueSerializer.cs
- RotateTransform.cs
- SystemColorTracker.cs
- SharedUtils.cs
- AccessorTable.cs
- XmlCountingReader.cs
- RowCache.cs
- StringAnimationBase.cs
- CheckBoxPopupAdapter.cs
- ItemsControl.cs
- SoapFormatExtensions.cs
- Ops.cs
- unitconverter.cs
- ReadOnlyDataSourceView.cs
- CompiledRegexRunner.cs
- WebUtil.cs
- NextPreviousPagerField.cs
- FrameworkElement.cs
- InputProviderSite.cs
- Geometry.cs
- DriveNotFoundException.cs
- Terminate.cs
- SignatureSummaryDialog.cs
- DataSourceCache.cs
- Add.cs
- CaseInsensitiveHashCodeProvider.cs
- fixedPageContentExtractor.cs
- SemanticBasicElement.cs
- TransformerInfoCollection.cs
- DbProviderFactory.cs
- namescope.cs
- GlyphTypeface.cs
- ProvideValueServiceProvider.cs
- BookmarkScopeInfo.cs
- DataKeyPropertyAttribute.cs
- OuterGlowBitmapEffect.cs
- BindableTemplateBuilder.cs
- OrderedParallelQuery.cs
- TripleDESCryptoServiceProvider.cs