Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / QueryOperator.cs / 1305376 / QueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // QueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// This is the abstract base class for all query operators in the system. It /// implements the ParallelQuery{T} type so that it can be bound as the source /// of parallel queries and so that it can be returned as the result of parallel query /// operations. Not much is in here, although it does serve as the "entry point" for /// opening all query operators: it will lazily analyze and cache a plan the first /// time the tree is opened, and will open the tree upon calls to GetEnumerator. /// /// Notes: /// This class implements ParallelQuery so that any parallel query operator /// can bind to the parallel query provider overloads. This allows us to string /// together operators w/out the user always specifying AsParallel, e.g. /// Select(Where(..., ...), ...), and so forth. /// ///internal abstract class QueryOperator : ParallelQuery { protected bool m_outputOrdered; internal QueryOperator(QuerySettings settings) :this(false, settings) { } internal QueryOperator(bool isOrdered, QuerySettings settings) :base(settings) { m_outputOrdered = isOrdered; } //---------------------------------------------------------------------------------------- // Opening the query operator will do whatever is necessary to begin enumerating its // results. This includes in some cases actually introducing parallelism, enumerating // other query operators, and so on. This is abstract and left to the specific concrete // operator classes to implement. // // Arguments: // settings - various flags and settings to control query execution // preferStriping - flag representing whether the caller prefers striped partitioning // over range partitioning // // Return Values: // Either a single enumerator, or a partition (for partition parallelism). // internal abstract QueryResults Open(QuerySettings settings, bool preferStriping); //--------------------------------------------------------------------------------------- // The GetEnumerator method is the standard IEnumerable mechanism for walking the // contents of a query. Note that GetEnumerator is only ever called on the root node: // we then proceed by calling Open on all of the subsequent query nodes. // // Arguments: // usePipelining - whether the returned enumerator will pipeline (i.e. return // control to the caller when the query is spawned) or not // (i.e. use the calling thread to execute the query). Note // that there are some conditions during which this hint will // be ignored -- currently, that happens only if a sort is // found anywhere in the query graph. // suppressOrderPreservation - whether to shut order preservation off, regardless // of the contents of the query // // Return Value: // An enumerator that retrieves elements from the query output. // // Notes: // The default mode of execution is to pipeline the query execution with respect // to the GetEnumerator caller (aka the consumer). An overload is available // that can be used to override the default with an explicit choice. // public override IEnumerator GetEnumerator() { // Buffering is unspecified and order preservation is not suppressed. return GetEnumerator(null, false); } public IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions) { // Pass through the value supplied for pipelining, and do not suppress // order preservation by default. return GetEnumerator(mergeOptions, false); } //--------------------------------------------------------------------------------------- // Is the output of this operator ordered? // internal bool OutputOrdered { get { return m_outputOrdered; } } internal virtual IEnumerator GetEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrderPreservation) { // Return a dummy enumerator that will call back GetOpenedEnumerator() on 'this' QueryOperator // the first time the user calls MoveNext(). We do this to prevent executing the query if user // never calls MoveNext(). return new QueryOpeningEnumerator (this, mergeOptions, suppressOrderPreservation); } //--------------------------------------------------------------------------------------- // The GetOpenedEnumerator method return an enumerator that walks the contents of a query. // The enumerator will be "opened", which means that PLINQ will start executing the query // immediately, even before the user calls MoveNext() for the first time. // internal IEnumerator GetOpenedEnumerator(ParallelMergeOptions? mergeOptions, bool suppressOrder, bool forEffect, QuerySettings querySettings) { // If the top-level enumerator forces a premature merge, run the query sequentially. if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism) { IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken); return ExceptionAggregator.WrapEnumerable(opSequential, querySettings.CancellationState).GetEnumerator(); } QueryResults queryResults = GetQueryResults(querySettings); if (mergeOptions == null) { mergeOptions = querySettings.MergeOptions; } Contract.Assert(mergeOptions != null); // Top-level pre-emptive cancellation test. // This handles situations where cancellation has occured before execution commences // The handling for in-execution occurs in QueryTaskGroupState.QueryEnd() if(querySettings.CancellationState.MergedCancellationToken.IsCancellationRequested) { if (querySettings.CancellationState.ExternalCancellationToken.IsCancellationRequested) throw new OperationCanceledException(querySettings.CancellationState.ExternalCancellationToken); else throw new OperationCanceledException(); } bool orderedMerge = OutputOrdered && !suppressOrder; PartitionedStreamMerger merger = new PartitionedStreamMerger (forEffect, mergeOptions.GetValueOrDefault(), querySettings.TaskScheduler, orderedMerge, querySettings.CancellationState, querySettings.QueryId); queryResults.GivePartitionedStream(merger); // hook up the data flow between the operator-executors, starting from the merger. if (forEffect) { return null; } return merger.MergeExecutor.GetEnumerator(); } // This method is called only once on the 'head operator' which is the last specified operator in the query // This method then recursively uses Open() to prepare itself and the other enumerators. private QueryResults GetQueryResults(QuerySettings querySettings) { TraceHelpers.TraceInfo("[timing]: {0}: starting execution - QueryOperator<>::GetQueryResults", DateTime.Now.Ticks); // All mandatory query settings must be specified Contract.Assert(querySettings.TaskScheduler != null); Contract.Assert(querySettings.DegreeOfParallelism.HasValue); Contract.Assert(querySettings.ExecutionMode.HasValue); // Now just open the query tree's root operator, supplying a specific DOP return Open(querySettings, false); } //---------------------------------------------------------------------------------------- // Executes the query and returns the results in an array. // internal TOutput[] ExecuteAndGetResultsAsArray() { QuerySettings querySettings = SpecifiedQuerySettings .WithPerExecutionSettings() .WithDefaults(); QueryLifecycle.LogicalQueryExecutionBegin(querySettings.QueryId); try { if (querySettings.ExecutionMode.Value == ParallelExecutionMode.Default && LimitsParallelism) { IEnumerable opSequential = AsSequentialQuery(querySettings.CancellationState.ExternalCancellationToken); IEnumerable opSequentialWithCancelChecks = CancellableEnumerable.Wrap(opSequential, querySettings.CancellationState.ExternalCancellationToken); return ExceptionAggregator.WrapEnumerable(opSequentialWithCancelChecks, querySettings.CancellationState).ToArray(); } QueryResults results = GetQueryResults(querySettings); if (results.IsIndexible && OutputOrdered) { // The special array-based merge performs better if the output is ordered, because // it does not have to pay for ordering. In the unordered case, we it appears that // the stop-and-go merge performs a little better. ArrayMergeHelper merger = new ArrayMergeHelper (SpecifiedQuerySettings, results); merger.Execute(); TOutput[] output = merger.GetResultsAsArray(); querySettings.CleanStateAtQueryEnd(); return output; } else { PartitionedStreamMerger merger = new PartitionedStreamMerger (false, ParallelMergeOptions.FullyBuffered, querySettings.TaskScheduler, OutputOrdered, querySettings.CancellationState, querySettings.QueryId); results.GivePartitionedStream(merger); TOutput[] output = merger.MergeExecutor.GetResultsAsArray(); querySettings.CleanStateAtQueryEnd(); return output; } } finally { QueryLifecycle.LogicalQueryExecutionEnd(querySettings.QueryId); } } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // // Note that iterating the returned enumerable will not wrap exceptions AggregateException. // Before this enumerable is returned to the user, we must wrap it with an // ExceptionAggregator. // internal abstract IEnumerable AsSequentialQuery(CancellationToken token); //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal abstract bool LimitsParallelism { get; } //---------------------------------------------------------------------------------------- // The state of the order index of the results returned by this operator. // internal abstract OrdinalIndexState OrdinalIndexState { get; } //--------------------------------------------------------------------------------------- // A helper method that executes the query rooted at the openedChild operator, and returns // the results as ListQueryResults . // internal static ListQueryResults ExecuteAndCollectResults ( PartitionedStream openedChild, int partitionCount, bool outputOrdered, bool useStriping, QuerySettings settings) { TaskScheduler taskScheduler = settings.TaskScheduler; MergeExecutor executor = MergeExecutor .Execute ( openedChild, false, ParallelMergeOptions.FullyBuffered, taskScheduler, outputOrdered, settings.CancellationState, settings.QueryId); return new ListQueryResults (executor.GetResultsAsArray(), partitionCount, useStriping); } //---------------------------------------------------------------------------------------- // Returns a QueryOperator for any IEnumerable data source. This will just do a // cast and return a reference to the same data source if the source is another query // operator, but will lazily allocate a scan operation and return that otherwise. // // Arguments: // source - any enumerable data source to be wrapped // // Return Value: // A query operator. // internal static QueryOperator AsQueryOperator(IEnumerable source) { Contract.Assert(source != null); // Just try casting the data source to a query operator, in the case that // our child is just another query operator. QueryOperator sourceAsOperator = source as QueryOperator ; if (sourceAsOperator == null) { OrderedParallelQuery orderedQuery = source as OrderedParallelQuery ; if (orderedQuery != null) { // We have to handle OrderedParallelQuery specially. In all other cases, // ParallelQuery *is* the QueryOperator . But, OrderedParallelQuery // is not QueryOperator , it only has a reference to one. Ideally, we // would want SortQueryOperator to inherit from OrderedParallelQuery , // but that conflicts with other constraints on our class hierarchy. sourceAsOperator = (QueryOperator )orderedQuery.SortOperator; } else { // If the cast failed, then the data source is a real piece of data. We // just construct a new scan operator on top of it. sourceAsOperator = new ScanQueryOperator (source); } } Contract.Assert(sourceAsOperator != null); return sourceAsOperator; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- GeometryConverter.cs
- SqlResolver.cs
- ZoneMembershipCondition.cs
- Utils.cs
- TabletDeviceInfo.cs
- UnicodeEncoding.cs
- PrintingPermission.cs
- TracedNativeMethods.cs
- ProfilePropertySettingsCollection.cs
- ParameterBuilder.cs
- LinqDataSourceDisposeEventArgs.cs
- MemberDescriptor.cs
- WorkflowInspectionServices.cs
- SerializableAttribute.cs
- Int16AnimationUsingKeyFrames.cs
- DataGridViewButtonColumn.cs
- BaseTreeIterator.cs
- WsdlEndpointConversionContext.cs
- PathSegmentCollection.cs
- DataRecordInfo.cs
- StrokeCollectionDefaultValueFactory.cs
- MobileResource.cs
- OracleBoolean.cs
- StylusLogic.cs
- ObjectDataSourceFilteringEventArgs.cs
- TabPage.cs
- MetadataHelper.cs
- XmlSchemaComplexType.cs
- DataGridViewRowStateChangedEventArgs.cs
- StronglyTypedResourceBuilder.cs
- ProofTokenCryptoHandle.cs
- QueryCacheEntry.cs
- SqlReorderer.cs
- X509CertificateClaimSet.cs
- AppDomainManager.cs
- TdsParserSessionPool.cs
- UIElement.cs
- MetaTable.cs
- ObjectConverter.cs
- IISUnsafeMethods.cs
- ClientConvert.cs
- BindToObject.cs
- XmlSchemaType.cs
- Win32Exception.cs
- ScaleTransform.cs
- ButtonPopupAdapter.cs
- DataSetUtil.cs
- BasicDesignerLoader.cs
- RequestTimeoutManager.cs
- ViewService.cs
- EventLogPermissionEntry.cs
- ApplicationException.cs
- X509InitiatorCertificateClientElement.cs
- GeneralTransformCollection.cs
- UriWriter.cs
- DynamicResourceExtensionConverter.cs
- Contracts.cs
- UnsafeNetInfoNativeMethods.cs
- EtwTrace.cs
- BidirectionalDictionary.cs
- Membership.cs
- ProtocolsSection.cs
- MaterializeFromAtom.cs
- CSharpCodeProvider.cs
- BoundColumn.cs
- InteropEnvironment.cs
- AssemblyName.cs
- OutArgument.cs
- ColorConvertedBitmapExtension.cs
- ImageFormatConverter.cs
- XmlUrlResolver.cs
- DataServiceException.cs
- TimerElapsedEvenArgs.cs
- UseAttributeSetsAction.cs
- DictionarySectionHandler.cs
- ReadOnlyCollectionBase.cs
- indexingfiltermarshaler.cs
- ContractNamespaceAttribute.cs
- HttpVersion.cs
- followingquery.cs
- ItemsControl.cs
- KeyedHashAlgorithm.cs
- NavigateEvent.cs
- DbUpdateCommandTree.cs
- EventLogTraceListener.cs
- WebConfigManager.cs
- PowerModeChangedEventArgs.cs
- XmlILTrace.cs
- ComboBoxRenderer.cs
- IncomingWebResponseContext.cs
- ObjectAnimationBase.cs
- TextEndOfLine.cs
- UrlPath.cs
- SafeArchiveContext.cs
- FileDialog_Vista_Interop.cs
- GetPageNumberCompletedEventArgs.cs
- SqlNodeAnnotations.cs
- SecurityTokenSerializer.cs
- SessionStateUtil.cs
- SoapObjectReader.cs