Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / QueryOperator.cs / 1305376 / QueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// QueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// This is the abstract base class for all query operators in the system. It
/// implements the ParallelQuery{T} type so that it can be bound as the source
/// of parallel queries and so that it can be returned as the result of parallel query
/// operations. Not much is in here, although it does serve as the "entry point" for
/// opening all query operators: it will lazily analyze and cache a plan the first
/// time the tree is opened, and will open the tree upon calls to GetEnumerator.
///
/// Notes:
/// This class implements ParallelQuery so that any parallel query operator
/// can bind to the parallel query provider overloads. This allows us to string
/// together operators w/out the user always specifying AsParallel, e.g.
/// Select(Where(..., ...), ...), and so forth.
///
///
internal abstract class QueryOperator : ParallelQuery
{
protected bool m_outputOrdered;
internal QueryOperator(QuerySettings settings)
:this(false, settings)
{
}
internal QueryOperator(bool isOrdered, QuerySettings settings)
:base(settings)
{
m_outputOrdered = isOrdered;
}
//----------------------------------------------------------------------------------------
// Opening the query operator will do whatever is necessary to begin enumerating its
// results. This includes in some cases actually introducing parallelism, enumerating
// other query operators, and so on. This is abstract and left to the specific concrete
// operator classes to implement.
//
// Arguments:
// settings - various flags and settings to control query execution
// preferStriping - flag representing whether the caller prefers striped partitioning
// over range partitioning
//
// Return Values:
// Either a single enumerator, or a partition (for partition parallelism).
//
internal abstract QueryResults Open(QuerySettings settings, bool preferStriping);
//---------------------------------------------------------------------------------------
// The GetEnumerator method is the standard IEnumerable mechanism for walking the
// contents of a query. Note that GetEnumerator is only ever called on the root node:
// we then proceed by calling Open on all of the subsequent query nodes.
//
// Arguments:
// usePipelining - whether the returned enumerator will pipeline (i.e. return
// control to the caller when the query is spawned) or not
// (i.e. use the calling thread to execute the query). Note
// that there are some conditions during which this hint will
// be ignored -- currently, that happens only if a sort is
// found anywhere in the query graph.
// suppressOrderPreservation - whether to shut order preservation off, regardless
// of the contents of the query
//
// Return Value:
// An enumerator that retrieves elements from the query output.
//
// Notes:
// The default mode of execution is to pipeline the query execution with respect
// to the GetEnumerator caller (aka the consumer). An overload is available
// that can be used to override the default with an explicit choice.
//
public override IEnumerator GetEnumerator()
{
// Buffering is unspecified and order preservation is not suppressed.
return GetEnumerator(null, false);
}
public IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions)
{
// Pass through the value supplied for pipelining, and do not suppress
// order preservation by default.
return GetEnumerator(mergeOptions, false);
}
//---------------------------------------------------------------------------------------
// Is the output of this operator ordered?
//
internal bool OutputOrdered
{
get { return m_outputOrdered; }
}
internal virtual IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation)
{
// Return a dummy enumerator that will call back GetOpenedEnumerator() on 'this' QueryOperator
// the first time the user calls MoveNext(). We do this to prevent executing the query if user
// never calls MoveNext().
return new QueryOpeningEnumerator(this, mergeOptions, suppressOrderPreservation);
}
//---------------------------------------------------------------------------------------
// The GetOpenedEnumerator method return an enumerator that walks the contents of a query.
// The enumerator will be "opened", which means that PLINQ will start executing the query
// immediately, even before the user calls MoveNext() for the first time.
//
internal IEnumerator GetOpenedEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrder, bool forEffect,
QuerySettings querySettings)
{
// If the top-level enumerator forces a premature merge, run the query sequentially.
if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism)
{
IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
return ExceptionAggregator.WrapEnumerable(opSequential, querySettings.CancellationState).GetEnumerator();
}
QueryResults queryResults = GetQueryResults(querySettings);
if (mergeOptions == null)
{
mergeOptions = querySettings.MergeOptions;
}
Contract.Assert(mergeOptions != null);
// Top-level pre-emptive cancellation test.
// This handles situations where cancellation has occured before execution commences
// The handling for in-execution occurs in QueryTaskGroupState.QueryEnd()
if(querySettings.CancellationState.MergedCancellationToken.IsCancellationRequested)
{
if (querySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested)
throw new OperationCanceledException(querySettings.CancellationState.ExternalCancellationToken);
else
throw new OperationCanceledException();
}
bool orderedMerge = OutputOrdered && !suppressOrder;
PartitionedStreamMerger merger = new PartitionedStreamMerger(forEffect, mergeOptions.GetValueOrDefault(),
querySettings.TaskScheduler,
orderedMerge,
querySettings.CancellationState,
querySettings.QueryId);
queryResults.GivePartitionedStream(merger); // hook up the data flow between the operator-executors, starting from the merger.
if (forEffect)
{
return null;
}
return merger.MergeExecutor.GetEnumerator();
}
// This method is called only once on the 'head operator' which is the last specified operator in the query
// This method then recursively uses Open() to prepare itself and the other enumerators.
private QueryResults GetQueryResults(QuerySettings querySettings)
{
TraceHelpers.TraceInfo("[timing]: {0}: starting execution - QueryOperator<>::GetQueryResults", DateTime.Now.Ticks);
// All mandatory query settings must be specified
Contract.Assert(querySettings.TaskScheduler != null);
Contract.Assert(querySettings.DegreeOfParallelism.HasValue);
Contract.Assert(querySettings.ExecutionMode.HasValue);
// Now just open the query tree's root operator, supplying a specific DOP
return Open(querySettings, false);
}
//----------------------------------------------------------------------------------------
// Executes the query and returns the results in an array.
//
internal TOutput[] ExecuteAndGetResultsAsArray()
{
QuerySettings querySettings =
SpecifiedQuerySettings
.WithPerExecutionSettings()
.WithDefaults();
QueryLifecycle.LogicalQueryExecutionBegin(querySettings.QueryId);
try
{
if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism)
{
IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken);
IEnumerable opSequentialWithCancelChecks = CancellableEnumerable.Wrap(opSequential, querySettings.CancellationState.ExternalCancellationToken);
return ExceptionAggregator.WrapEnumerable(opSequentialWithCancelChecks, querySettings.CancellationState).ToArray();
}
QueryResults results = GetQueryResults(querySettings);
if (results.IsIndexible && OutputOrdered)
{
// The special array-based merge performs better if the output is ordered, because
// it does not have to pay for ordering. In the unordered case, we it appears that
// the stop-and-go merge performs a little better.
ArrayMergeHelper merger = new ArrayMergeHelper(SpecifiedQuerySettings, results);
merger.Execute();
TOutput[] output = merger.GetResultsAsArray();
querySettings.CleanStateAtQueryEnd();
return output;
}
else
{
PartitionedStreamMerger merger =
new PartitionedStreamMerger(false, ParallelMergeOptions.FullyBuffered, querySettings.TaskScheduler,
OutputOrdered, querySettings.CancellationState, querySettings.QueryId);
results.GivePartitionedStream(merger);
TOutput[] output = merger.MergeExecutor.GetResultsAsArray();
querySettings.CleanStateAtQueryEnd();
return output;
}
}
finally
{
QueryLifecycle.LogicalQueryExecutionEnd(querySettings.QueryId);
}
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
// Note that iterating the returned enumerable will not wrap exceptions AggregateException.
// Before this enumerable is returned to the user, we must wrap it with an
// ExceptionAggregator.
//
internal abstract IEnumerable AsSequentialQuery(CancellationToken token);
//----------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal abstract bool LimitsParallelism { get; }
//----------------------------------------------------------------------------------------
// The state of the order index of the results returned by this operator.
//
internal abstract OrdinalIndexState OrdinalIndexState { get; }
//---------------------------------------------------------------------------------------
// A helper method that executes the query rooted at the openedChild operator, and returns
// the results as ListQueryResults.
//
internal static ListQueryResults ExecuteAndCollectResults(
PartitionedStream openedChild,
int partitionCount,
bool outputOrdered,
bool useStriping,
QuerySettings settings)
{
TaskScheduler taskScheduler = settings.TaskScheduler;
MergeExecutor executor = MergeExecutor.Execute(
openedChild, false, ParallelMergeOptions.FullyBuffered, taskScheduler, outputOrdered,
settings.CancellationState, settings.QueryId);
return new ListQueryResults(executor.GetResultsAsArray(), partitionCount, useStriping);
}
//----------------------------------------------------------------------------------------
// Returns a QueryOperator for any IEnumerable data source. This will just do a
// cast and return a reference to the same data source if the source is another query
// operator, but will lazily allocate a scan operation and return that otherwise.
//
// Arguments:
// source - any enumerable data source to be wrapped
//
// Return Value:
// A query operator.
//
internal static QueryOperator AsQueryOperator(IEnumerable source)
{
Contract.Assert(source != null);
// Just try casting the data source to a query operator, in the case that
// our child is just another query operator.
QueryOperator sourceAsOperator = source as QueryOperator;
if (sourceAsOperator == null)
{
OrderedParallelQuery orderedQuery = source as OrderedParallelQuery;
if (orderedQuery != null)
{
// We have to handle OrderedParallelQuery specially. In all other cases,
// ParallelQuery *is* the QueryOperator. But, OrderedParallelQuery
// is not QueryOperator, it only has a reference to one. Ideally, we
// would want SortQueryOperator to inherit from OrderedParallelQuery,
// but that conflicts with other constraints on our class hierarchy.
sourceAsOperator = (QueryOperator)orderedQuery.SortOperator;
}
else
{
// If the cast failed, then the data source is a real piece of data. We
// just construct a new scan operator on top of it.
sourceAsOperator = new ScanQueryOperator(source);
}
}
Contract.Assert(sourceAsOperator != null);
return sourceAsOperator;
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- HeaderCollection.cs
- ListItemConverter.cs
- AttributeProviderAttribute.cs
- InvocationExpression.cs
- AuthStoreRoleProvider.cs
- TextSpanModifier.cs
- VisualProxy.cs
- ResourceKey.cs
- XmlAttributeOverrides.cs
- ItemCheckEvent.cs
- ConfigurationStrings.cs
- StaticResourceExtension.cs
- PlatformCulture.cs
- XmlDocumentFragment.cs
- OneOfElement.cs
- TypeDependencyAttribute.cs
- SecurityTokenTypes.cs
- CompositeControlDesigner.cs
- EqualityArray.cs
- RegisteredDisposeScript.cs
- AdjustableArrowCap.cs
- ContentPosition.cs
- Cursors.cs
- FlowDocumentReaderAutomationPeer.cs
- MetadataItemCollectionFactory.cs
- TrackingAnnotationCollection.cs
- HwndProxyElementProvider.cs
- ChannelCacheSettings.cs
- TreeNodeStyleCollection.cs
- ProcessHostServerConfig.cs
- DrawItemEvent.cs
- WindowsScroll.cs
- CompositeCollection.cs
- mediaeventargs.cs
- SamlAttribute.cs
- SoapWriter.cs
- DataGridViewTopLeftHeaderCell.cs
- DynamicResourceExtension.cs
- TriggerCollection.cs
- MediaSystem.cs
- DataGridCommandEventArgs.cs
- ControlBuilderAttribute.cs
- OracleFactory.cs
- __Filters.cs
- XPathBinder.cs
- InvokeMemberBinder.cs
- WebServiceMethodData.cs
- ComponentSerializationService.cs
- DefaultHttpHandler.cs
- sqlpipe.cs
- WebScriptMetadataFormatter.cs
- XPathNodeList.cs
- StandardToolWindows.cs
- SetIndexBinder.cs
- ClientBuildManager.cs
- NumericPagerField.cs
- ToolStripStatusLabel.cs
- StringHandle.cs
- SoapSchemaExporter.cs
- ResourceSet.cs
- WmpBitmapDecoder.cs
- ConfigUtil.cs
- SamlAssertionKeyIdentifierClause.cs
- XmlFormatWriterGenerator.cs
- DemultiplexingDispatchMessageFormatter.cs
- httpserverutility.cs
- InfoCardListRequest.cs
- UnmanagedMemoryStream.cs
- VirtualDirectoryMapping.cs
- XmlSerializerSection.cs
- ObjectCloneHelper.cs
- BitHelper.cs
- CustomErrorsSection.cs
- RubberbandSelector.cs
- ExtensionFile.cs
- ScrollableControlDesigner.cs
- XmlComment.cs
- LocatorBase.cs
- Viewport3DAutomationPeer.cs
- MessageHeaderDescriptionCollection.cs
- Point4D.cs
- CalendarData.cs
- BaseCodePageEncoding.cs
- TabItem.cs
- TemplateBaseAction.cs
- CompositeControl.cs
- CheckBoxAutomationPeer.cs
- Empty.cs
- TextTreeUndo.cs
- ProcessProtocolHandler.cs
- ModulesEntry.cs
- TextMarkerSource.cs
- WmfPlaceableFileHeader.cs
- HTMLTextWriter.cs
- PageHandlerFactory.cs
- LostFocusEventManager.cs
- EndPoint.cs
- FloaterParagraph.cs
- AssemblyName.cs
- XPathNavigatorKeyComparer.cs