Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / OrderPreservingSpoolingTask.cs / 1305376 / OrderPreservingSpoolingTask.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingSpoolingTask.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// A spooling task handles marshaling data from a producer to a consumer. It's given
/// a single enumerator object that contains all of the production algorithms, a single
/// destination channel from which consumers draw results, and (optionally) a
/// synchronization primitive using which to notify asynchronous consumers. This
/// particular task variant preserves sort order in the final data.
///
///
///
internal class OrderPreservingSpoolingTask : SpoolingTaskBase
{
private Shared m_results; // The destination array cell into which data is placed.
private SortHelper m_sortHelper; // A helper that performs the sorting.
//------------------------------------------------------------------------------------
// Creates, but does not execute, a new spooling task.
//
// Arguments:
// taskIndex - the unique index of this task
// ordinalIndexState - the state of ordinal indices
// source - the producer enumerator
// destination - the destination channel into which to spool elements
//
// Assumptions:
// Source cannot be null, although the other arguments may be.
//
private OrderPreservingSpoolingTask(
int taskIndex, QueryTaskGroupState groupState,
Shared results, SortHelper sortHelper) :
base(taskIndex, groupState)
{
Contract.Assert(groupState != null);
Contract.Assert(results != null);
Contract.Assert(sortHelper != null);
m_results = results;
m_sortHelper = sortHelper;
}
//-----------------------------------------------------------------------------------
// Creates and begins execution of a new spooling task. If pipelineMerges is specified,
// we will execute the task asynchronously; otherwise, this is done synchronously,
// and by the time this API has returned all of the results have been produced.
//
// Arguments:
// source - the producer enumerator
// destination - the destination channel into which to spool elements
// ordinalIndexState - state of the index of the input to the merge
//
// Assumptions:
// Source cannot be null, although the other arguments may be.
//
internal static void Spool(
QueryTaskGroupState groupState, PartitionedStream partitions,
Shared results, TaskScheduler taskScheduler)
{
Contract.Assert(groupState != null);
Contract.Assert(partitions != null);
Contract.Assert(results != null);
Contract.Assert(results.Value == null);
// Determine how many async tasks to create.
int maxToRunInParallel = partitions.PartitionCount - 1;
// Generate a set of sort helpers.
SortHelper[] sortHelpers =
SortHelper.GenerateSortHelpers(partitions, groupState);
// Ensure all tasks in this query are parented under a common root.
Task rootTask = new Task(
() =>
{
// Create tasks that will enumerate the partitions in parallel. We'll use the current
// thread for one task and then block before returning to the caller, until all results
// have been accumulated. Pipelining is not supported by sort merges.
for (int i = 0; i < maxToRunInParallel; i++)
{
TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i);
QueryTask asyncTask = new OrderPreservingSpoolingTask(
i, groupState, results, sortHelpers[i]);
asyncTask.RunAsynchronously(taskScheduler);
}
// Run one task synchronously on the current thread.
TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
QueryTask syncTask = new OrderPreservingSpoolingTask(
maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]);
syncTask.RunSynchronously(taskScheduler);
});
// Begin the query on the calling thread.
groupState.QueryBegin(rootTask);
// We don't want to return until the task is finished. Run it on the calling thread.
rootTask.RunSynchronously(taskScheduler);
// Destroy the state associated with our sort helpers.
for (int i = 0; i < sortHelpers.Length; i++)
{
sortHelpers[i].Dispose();
}
// End the query, which has the effect of propagating any unhandled exceptions.
groupState.QueryEnd(false);
}
//-----------------------------------------------------------------------------------
// This method is responsible for enumerating results and enqueueing them to
// the output channel(s) as appropriate. Each base class implements its own.
//
protected override void SpoolingWork()
{
Contract.Assert(m_sortHelper != null);
// This task must perform a sort just prior to handing data to the merge.
// We just defer to a sort helper object for this task.
TInputOutput[] sortedOutput = m_sortHelper.Sort();
if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
{
// The 0th task is responsible for communicating the results to the merging infrastructure.
// By this point, the results have been sorted, so we just publish a reference to the array.
if (m_taskIndex == 0)
{
Contract.Assert(sortedOutput != null);
m_results.Value = sortedOutput;
}
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingSpoolingTask.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// A spooling task handles marshaling data from a producer to a consumer. It's given
/// a single enumerator object that contains all of the production algorithms, a single
/// destination channel from which consumers draw results, and (optionally) a
/// synchronization primitive using which to notify asynchronous consumers. This
/// particular task variant preserves sort order in the final data.
///
///
///
internal class OrderPreservingSpoolingTask : SpoolingTaskBase
{
private Shared m_results; // The destination array cell into which data is placed.
private SortHelper m_sortHelper; // A helper that performs the sorting.
//------------------------------------------------------------------------------------
// Creates, but does not execute, a new spooling task.
//
// Arguments:
// taskIndex - the unique index of this task
// ordinalIndexState - the state of ordinal indices
// source - the producer enumerator
// destination - the destination channel into which to spool elements
//
// Assumptions:
// Source cannot be null, although the other arguments may be.
//
private OrderPreservingSpoolingTask(
int taskIndex, QueryTaskGroupState groupState,
Shared results, SortHelper sortHelper) :
base(taskIndex, groupState)
{
Contract.Assert(groupState != null);
Contract.Assert(results != null);
Contract.Assert(sortHelper != null);
m_results = results;
m_sortHelper = sortHelper;
}
//-----------------------------------------------------------------------------------
// Creates and begins execution of a new spooling task. If pipelineMerges is specified,
// we will execute the task asynchronously; otherwise, this is done synchronously,
// and by the time this API has returned all of the results have been produced.
//
// Arguments:
// source - the producer enumerator
// destination - the destination channel into which to spool elements
// ordinalIndexState - state of the index of the input to the merge
//
// Assumptions:
// Source cannot be null, although the other arguments may be.
//
internal static void Spool(
QueryTaskGroupState groupState, PartitionedStream partitions,
Shared results, TaskScheduler taskScheduler)
{
Contract.Assert(groupState != null);
Contract.Assert(partitions != null);
Contract.Assert(results != null);
Contract.Assert(results.Value == null);
// Determine how many async tasks to create.
int maxToRunInParallel = partitions.PartitionCount - 1;
// Generate a set of sort helpers.
SortHelper[] sortHelpers =
SortHelper.GenerateSortHelpers(partitions, groupState);
// Ensure all tasks in this query are parented under a common root.
Task rootTask = new Task(
() =>
{
// Create tasks that will enumerate the partitions in parallel. We'll use the current
// thread for one task and then block before returning to the caller, until all results
// have been accumulated. Pipelining is not supported by sort merges.
for (int i = 0; i < maxToRunInParallel; i++)
{
TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] asynchronously", i);
QueryTask asyncTask = new OrderPreservingSpoolingTask(
i, groupState, results, sortHelpers[i]);
asyncTask.RunAsynchronously(taskScheduler);
}
// Run one task synchronously on the current thread.
TraceHelpers.TraceInfo("OrderPreservingSpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
QueryTask syncTask = new OrderPreservingSpoolingTask(
maxToRunInParallel, groupState, results, sortHelpers[maxToRunInParallel]);
syncTask.RunSynchronously(taskScheduler);
});
// Begin the query on the calling thread.
groupState.QueryBegin(rootTask);
// We don't want to return until the task is finished. Run it on the calling thread.
rootTask.RunSynchronously(taskScheduler);
// Destroy the state associated with our sort helpers.
for (int i = 0; i < sortHelpers.Length; i++)
{
sortHelpers[i].Dispose();
}
// End the query, which has the effect of propagating any unhandled exceptions.
groupState.QueryEnd(false);
}
//-----------------------------------------------------------------------------------
// This method is responsible for enumerating results and enqueueing them to
// the output channel(s) as appropriate. Each base class implements its own.
//
protected override void SpoolingWork()
{
Contract.Assert(m_sortHelper != null);
// This task must perform a sort just prior to handing data to the merge.
// We just defer to a sort helper object for this task.
TInputOutput[] sortedOutput = m_sortHelper.Sort();
if (!m_groupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
{
// The 0th task is responsible for communicating the results to the merging infrastructure.
// By this point, the results have been sorted, so we just publish a reference to the array.
if (m_taskIndex == 0)
{
Contract.Assert(sortedOutput != null);
m_results.Value = sortedOutput;
}
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- AutoFocusStyle.xaml.cs
- RegexFCD.cs
- AsyncPostBackErrorEventArgs.cs
- WebPartDescriptionCollection.cs
- ServiceTimeoutsElement.cs
- ConfigurationManagerHelperFactory.cs
- FederatedMessageSecurityOverHttpElement.cs
- PropertyPushdownHelper.cs
- HwndProxyElementProvider.cs
- RegexRunnerFactory.cs
- CellParaClient.cs
- XmlDataSourceDesigner.cs
- NegatedCellConstant.cs
- PersonalizationAdministration.cs
- IndentTextWriter.cs
- DesignerForm.cs
- PartialCachingControl.cs
- dsa.cs
- UnaryOperationBinder.cs
- ReadOnlyPropertyMetadata.cs
- TreeNode.cs
- DataGridColumn.cs
- DoubleLink.cs
- LazyTextWriterCreator.cs
- EmptyWorkItem.cs
- DbModificationClause.cs
- MailMessageEventArgs.cs
- SchemaObjectWriter.cs
- CustomTypeDescriptor.cs
- GZipDecoder.cs
- SafeHandles.cs
- OracleMonthSpan.cs
- BamlMapTable.cs
- PropertyInformationCollection.cs
- oledbconnectionstring.cs
- ClientUtils.cs
- ElementsClipboardData.cs
- AssertHelper.cs
- X509ChainPolicy.cs
- DataList.cs
- EntityTypeEmitter.cs
- UmAlQuraCalendar.cs
- ColumnWidthChangingEvent.cs
- AutomationTextAttribute.cs
- PointConverter.cs
- Gdiplus.cs
- WindowsToolbar.cs
- HandlerBase.cs
- ListViewInsertedEventArgs.cs
- HandleRef.cs
- CommandConverter.cs
- InvokeProviderWrapper.cs
- QuerySelectOp.cs
- BigInt.cs
- DocumentApplicationJournalEntry.cs
- PrtCap_Base.cs
- WebPartDescription.cs
- RuleRefElement.cs
- Point3D.cs
- NeutralResourcesLanguageAttribute.cs
- DesignerActionUI.cs
- SystemDiagnosticsSection.cs
- ReadOnlyAttribute.cs
- DesignerActionItemCollection.cs
- ArglessEventHandlerProxy.cs
- SQLInt64.cs
- Range.cs
- SqlTrackingWorkflowInstance.cs
- WindowsSecurityToken.cs
- X509ChainPolicy.cs
- TCEAdapterGenerator.cs
- StrokeNodeData.cs
- InProcStateClientManager.cs
- WebPageTraceListener.cs
- MutexSecurity.cs
- AccessText.cs
- SafeNativeMethods.cs
- EncodedStreamFactory.cs
- StoreItemCollection.cs
- ReversePositionQuery.cs
- TypeInitializationException.cs
- XmlWellformedWriter.cs
- MessageQueueException.cs
- Utility.cs
- FileLogRecordEnumerator.cs
- ReferenceConverter.cs
- AQNBuilder.cs
- DrawingDrawingContext.cs
- RoutedEvent.cs
- WindowsMenu.cs
- AppDomainProtocolHandler.cs
- Globals.cs
- RequestBringIntoViewEventArgs.cs
- ListBindableAttribute.cs
- Automation.cs
- StringInfo.cs
- Animatable.cs
- TraceContextRecord.cs
- WindowsListBox.cs
- PointAnimationClockResource.cs