Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / OrderPreservingSpoolingTask.cs / 1305376 / OrderPreservingSpoolingTask.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingSpoolingTask.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. This /// particular task variant preserves sort order in the final data. /// ////// internal class OrderPreservingSpoolingTask : SpoolingTaskBase { private Shared m_results; // The destination array cell into which data is placed. private SortHelper m_sortHelper; // A helper that performs the sorting. //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // ordinalIndexState - the state of ordinal indices // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // private OrderPreservingSpoolingTask( int taskIndex, QueryTaskGroupState groupState, Shared results, SortHelper sortHelper) : base(taskIndex, groupState) { Contract.Assert(groupState != null); Contract.Assert(results != null); Contract.Assert(sortHelper != null); m_results = results; m_sortHelper = sortHelper; } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. If pipelineMerges is specified, // we will execute the task asynchronously; otherwise, this is done synchronously, // and by the time this API has returned all of the results have been produced. // // Arguments: // source - the producer enumerator // destination - the destination channel into which to spool elements // ordinalIndexState - state of the index of the input to the merge // // Assumptions: // Source cannot be null, although the other arguments may be. // internal static void Spool( QueryTaskGroupState groupState, PartitionedStream partitions, Shared results, TaskScheduler taskScheduler) { Contract.Assert(groupState != null); Contract.Assert(partitions != null); Contract.Assert(results != null); Contract.Assert(results.Value == null); // Determine how many async tasks to create. int maxToRunInParallel = partitions.PartitionCount - 1; // Generate a set of sort helpers. SortHelper [] sortHelpers = SortHelper .GenerateSortHelpers(partitions, groupState); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { // Create tasks that will enumerate the partitions in parallel. We'll use the current // thread for one task and then block before returning to the caller, until all results // have been accumulated. Pipelining is not supported by sort merges. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new OrderPreservingSpoolingTask ( i, groupState, results, sortHelpers[i]); asyncTask.RunAsynchronously(taskScheduler); } // Run one task synchronously on the current thread. TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); QueryTask syncTask = new OrderPreservingSpoolingTask ( maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Destroy the state associated with our sort helpers. for (int i = 0; i < sortHelpers.Length; i++) { sortHelpers[i].Dispose(); } // End the query, which has the effect of propagating any unhandled exceptions. groupState.QueryEnd(false); } //----------------------------------------------------------------------------------- // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { Contract.Assert(m_sortHelper != null); // This task must perform a sort just prior to handing data to the merge. // We just defer to a sort helper object for this task. TInputOutput[] sortedOutput = m_sortHelper.Sort(); if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested) { // The 0th task is responsible for communicating the results to the merging infrastructure. // By this point, the results have been sorted, so we just publish a reference to the array. if (m_taskIndex == 0) { Contract.Assert(sortedOutput != null); m_results.Value = sortedOutput; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingSpoolingTask.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. This /// particular task variant preserves sort order in the final data. /// ////// internal class OrderPreservingSpoolingTask : SpoolingTaskBase { private Shared m_results; // The destination array cell into which data is placed. private SortHelper m_sortHelper; // A helper that performs the sorting. //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // ordinalIndexState - the state of ordinal indices // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // private OrderPreservingSpoolingTask( int taskIndex, QueryTaskGroupState groupState, Shared results, SortHelper sortHelper) : base(taskIndex, groupState) { Contract.Assert(groupState != null); Contract.Assert(results != null); Contract.Assert(sortHelper != null); m_results = results; m_sortHelper = sortHelper; } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. If pipelineMerges is specified, // we will execute the task asynchronously; otherwise, this is done synchronously, // and by the time this API has returned all of the results have been produced. // // Arguments: // source - the producer enumerator // destination - the destination channel into which to spool elements // ordinalIndexState - state of the index of the input to the merge // // Assumptions: // Source cannot be null, although the other arguments may be. // internal static void Spool( QueryTaskGroupState groupState, PartitionedStream partitions, Shared results, TaskScheduler taskScheduler) { Contract.Assert(groupState != null); Contract.Assert(partitions != null); Contract.Assert(results != null); Contract.Assert(results.Value == null); // Determine how many async tasks to create. int maxToRunInParallel = partitions.PartitionCount - 1; // Generate a set of sort helpers. SortHelper [] sortHelpers = SortHelper .GenerateSortHelpers(partitions, groupState); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { // Create tasks that will enumerate the partitions in parallel. We'll use the current // thread for one task and then block before returning to the caller, until all results // have been accumulated. Pipelining is not supported by sort merges. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new OrderPreservingSpoolingTask ( i, groupState, results, sortHelpers[i]); asyncTask.RunAsynchronously(taskScheduler); } // Run one task synchronously on the current thread. TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); QueryTask syncTask = new OrderPreservingSpoolingTask ( maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Destroy the state associated with our sort helpers. for (int i = 0; i < sortHelpers.Length; i++) { sortHelpers[i].Dispose(); } // End the query, which has the effect of propagating any unhandled exceptions. groupState.QueryEnd(false); } //----------------------------------------------------------------------------------- // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { Contract.Assert(m_sortHelper != null); // This task must perform a sort just prior to handing data to the merge. // We just defer to a sort helper object for this task. TInputOutput[] sortedOutput = m_sortHelper.Sort(); if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested) { // The 0th task is responsible for communicating the results to the merging infrastructure. // By this point, the results have been sorted, so we just publish a reference to the array. if (m_taskIndex == 0) { Contract.Assert(sortedOutput != null); m_results.Value = sortedOutput; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SymDocumentType.cs
- WizardForm.cs
- CapiNative.cs
- HttpStaticObjectsCollectionWrapper.cs
- TransformationRules.cs
- SafeSystemMetrics.cs
- XmlParserContext.cs
- MaskedTextBoxTextEditorDropDown.cs
- ReplacementText.cs
- ContentType.cs
- PauseStoryboard.cs
- KeyInterop.cs
- SelectionManager.cs
- Conditional.cs
- ComponentFactoryHelpers.cs
- datacache.cs
- PostBackTrigger.cs
- RelationshipConverter.cs
- CustomLineCap.cs
- DataViewSettingCollection.cs
- DefaultValueAttribute.cs
- OrthographicCamera.cs
- NavigationPropertyEmitter.cs
- TableDetailsCollection.cs
- WebReferenceOptions.cs
- RequiredAttributeAttribute.cs
- TrackingValidationObjectDictionary.cs
- WebServiceBindingAttribute.cs
- DataViewListener.cs
- BaseProcessor.cs
- SafeHandle.cs
- StretchValidation.cs
- SqlUtil.cs
- Group.cs
- DataGridViewTextBoxEditingControl.cs
- ClientTargetCollection.cs
- WindowVisualStateTracker.cs
- SafeHandles.cs
- SqlAggregateChecker.cs
- SizeAnimationClockResource.cs
- RayHitTestParameters.cs
- StateMachineTimers.cs
- StateMachineWorkflow.cs
- HostExecutionContextManager.cs
- ToolStripControlHost.cs
- ElementsClipboardData.cs
- KnownTypes.cs
- login.cs
- CachedPathData.cs
- ResourcePermissionBaseEntry.cs
- AccessibleObject.cs
- DesignerSerializationOptionsAttribute.cs
- dsa.cs
- XmlReader.cs
- ListBox.cs
- QualifiedId.cs
- DbDeleteCommandTree.cs
- SpStreamWrapper.cs
- StringWriter.cs
- PixelShader.cs
- DataSourceCacheDurationConverter.cs
- SpellerHighlightLayer.cs
- SByteConverter.cs
- CodeVariableDeclarationStatement.cs
- MetadataCache.cs
- WebPartAuthorizationEventArgs.cs
- TextSpanModifier.cs
- ContextMarshalException.cs
- ConfigurationSectionGroupCollection.cs
- SamlAttributeStatement.cs
- ExecutionContext.cs
- _CacheStreams.cs
- XmlNavigatorFilter.cs
- StrokeSerializer.cs
- DelegatingTypeDescriptionProvider.cs
- InstanceStore.cs
- DataGridViewRowHeaderCell.cs
- MappedMetaModel.cs
- TextTrailingWordEllipsis.cs
- DefaultBinder.cs
- ConnectionAcceptor.cs
- SerializationObjectManager.cs
- HotCommands.cs
- CellIdBoolean.cs
- DuplicateMessageDetector.cs
- Trustee.cs
- IISUnsafeMethods.cs
- NumericUpDown.cs
- ParameterExpression.cs
- RSACryptoServiceProvider.cs
- HttpRequestWrapper.cs
- AutomationEventArgs.cs
- MethodBuilder.cs
- SignedPkcs7.cs
- ServicePointManagerElement.cs
- RemoteWebConfigurationHostServer.cs
- NativeMethods.cs
- DataServiceQuery.cs
- LightweightCodeGenerator.cs
- XmlAutoDetectWriter.cs