Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / ZipQueryOperator.cs / 1305376 / ZipQueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ZipQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
namespace System.Linq.Parallel
{
///
/// A Zip operator combines two input data sources into a single output stream,
/// using a pairwise element matching algorithm. For example, the result of zipping
/// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs,
/// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element
/// is matched with the element in the other data source at the same ordinal
/// position, the zip operator requires order preservation.
///
///
///
///
internal sealed class ZipQueryOperator
: QueryOperator
{
private readonly Func m_resultSelector; // To select result elements.
private readonly QueryOperator m_leftChild;
private readonly QueryOperator m_rightChild;
private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source
//----------------------------------------------------------------------------------------
// Initializes a new zip operator.
//
// Arguments:
// leftChild - the left data source from which to pull data.
// rightChild - the right data source from which to pull data.
//
internal ZipQueryOperator(
ParallelQuery leftChildSource, IEnumerable rightChildSource,
Func resultSelector)
:this(
QueryOperator.AsQueryOperator(leftChildSource),
QueryOperator.AsQueryOperator(rightChildSource),
resultSelector)
{
}
private ZipQueryOperator(
QueryOperator left, QueryOperator right,
Func resultSelector)
: base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings))
{
Contract.Assert(resultSelector != null, "operator cannot be null");
m_leftChild = left;
m_rightChild = right;
m_resultSelector = resultSelector;
m_outputOrdered = m_leftChild.OutputOrdered || m_rightChild.OutputOrdered;
m_prematureMergeLeft = m_leftChild.OrdinalIndexState != OrdinalIndexState.Indexible;
m_prematureMergeRight = m_rightChild.OrdinalIndexState != OrdinalIndexState.Indexible;
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the children and wrapping them with
// partitions as needed.
//
internal override QueryResults Open(QuerySettings settings, bool preferStriping)
{
// We just open our child operators, left and then right.
QueryResults leftChildResults = m_leftChild.Open(settings, preferStriping);
QueryResults rightChildResults = m_rightChild.Open(settings, preferStriping);
int partitionCount = settings.DegreeOfParallelism.Value;
if (m_prematureMergeLeft)
{
PartitionedStreamMerger merger = new PartitionedStreamMerger(
false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_leftChild.OutputOrdered,
settings.CancellationState, settings.QueryId);
leftChildResults.GivePartitionedStream(merger);
leftChildResults = new ListQueryResults(
merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping);
}
if (m_prematureMergeRight)
{
PartitionedStreamMerger merger = new PartitionedStreamMerger(
false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_rightChild.OutputOrdered,
settings.CancellationState, settings.QueryId);
rightChildResults.GivePartitionedStream(merger);
rightChildResults = new ListQueryResults(
merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping);
}
return new ZipQueryOperatorResults(leftChildResults, rightChildResults, m_resultSelector, partitionCount, preferStriping);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable AsSequentialQuery(CancellationToken token)
{
using(IEnumerator leftEnumerator = m_leftChild.AsSequentialQuery(token).GetEnumerator())
using(IEnumerator rightEnumerator = m_rightChild.AsSequentialQuery(token).GetEnumerator())
{
while(leftEnumerator.MoveNext() && rightEnumerator.MoveNext())
{
yield return m_resultSelector(leftEnumerator.Current, rightEnumerator.Current);
}
}
}
//---------------------------------------------------------------------------------------
// The state of the order index of the results returned by this operator.
//
internal override OrdinalIndexState OrdinalIndexState
{
get
{
return OrdinalIndexState.Indexible;
}
}
//----------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get
{
return m_prematureMergeLeft || m_prematureMergeRight;
}
}
//---------------------------------------------------------------------------------------
// A special QueryResults class for the Zip operator. It requires that both of the child
// QueryResults are indexible.
//
internal class ZipQueryOperatorResults : QueryResults
{
private readonly QueryResults m_leftChildResults;
private readonly QueryResults m_rightChildResults;
private readonly Func m_resultSelector; // To select result elements.
private readonly int m_count;
private readonly int m_partitionCount;
private readonly bool m_preferStriping;
internal ZipQueryOperatorResults(
QueryResults leftChildResults, QueryResults rightChildResults,
Func resultSelector, int partitionCount, bool preferStriping)
{
m_leftChildResults = leftChildResults;
m_rightChildResults = rightChildResults;
m_resultSelector = resultSelector;
m_partitionCount = partitionCount;
m_preferStriping = preferStriping;
Contract.Assert(m_leftChildResults.IsIndexible);
Contract.Assert(m_rightChildResults.IsIndexible);
m_count = Math.Min(m_leftChildResults.Count, m_rightChildResults.Count);
}
internal override int ElementsCount
{
get { return m_count; }
}
internal override bool IsIndexible
{
get { return true; }
}
internal override TOutput GetElement(int index)
{
return m_resultSelector(m_leftChildResults.GetElement(index), m_rightChildResults.GetElement(index));
}
internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient)
{
PartitionedStream partitionedStream = ExchangeUtilities.PartitionDataSource(this, m_partitionCount, m_preferStriping);
recipient.Receive(partitionedStream);
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- XmlCodeExporter.cs
- InstanceCreationEditor.cs
- SslStream.cs
- ResolveNameEventArgs.cs
- UIHelper.cs
- CalendarAutomationPeer.cs
- HttpStreamMessageEncoderFactory.cs
- VerticalAlignConverter.cs
- StickyNoteHelper.cs
- PageEventArgs.cs
- ChildChangedEventArgs.cs
- TableCellsCollectionEditor.cs
- ServiceOperationParameter.cs
- ISFClipboardData.cs
- EdmItemCollection.OcAssemblyCache.cs
- EventHandlerList.cs
- XmlCDATASection.cs
- SHA512Cng.cs
- RealizationDrawingContextWalker.cs
- EndOfStreamException.cs
- RegistrySecurity.cs
- SessionEndingEventArgs.cs
- CorrelationRequestContext.cs
- CdpEqualityComparer.cs
- ACL.cs
- AccessedThroughPropertyAttribute.cs
- MobileUITypeEditor.cs
- RangeValueProviderWrapper.cs
- JoinElimination.cs
- ZipIOLocalFileBlock.cs
- SecureStringHasher.cs
- PageThemeParser.cs
- Baml2006ReaderContext.cs
- odbcmetadatafactory.cs
- ControlCommandSet.cs
- CompoundFileStorageReference.cs
- HostingEnvironmentException.cs
- TableLayoutSettingsTypeConverter.cs
- Attributes.cs
- _IPv4Address.cs
- SamlAudienceRestrictionCondition.cs
- PerfCounters.cs
- GestureRecognizer.cs
- TextModifier.cs
- IisTraceListener.cs
- TypeDescriptionProviderAttribute.cs
- HandlerFactoryWrapper.cs
- Point3DCollection.cs
- MediaCommands.cs
- SqlConnectionString.cs
- NestedContainer.cs
- SQLBinaryStorage.cs
- TextElementEnumerator.cs
- UmAlQuraCalendar.cs
- AgileSafeNativeMemoryHandle.cs
- RichTextBoxConstants.cs
- ContextMarshalException.cs
- ElementNotEnabledException.cs
- DynamicResourceExtension.cs
- Rotation3D.cs
- DynamicExpression.cs
- ControlParameter.cs
- ImpersonateTokenRef.cs
- OleDbCommand.cs
- RunInstallerAttribute.cs
- VirtualPathProvider.cs
- invalidudtexception.cs
- IFormattable.cs
- WarningException.cs
- WebServiceEndpoint.cs
- NetworkStream.cs
- ScrollableControl.cs
- QueryCoreOp.cs
- TemplateParser.cs
- TreeNode.cs
- XhtmlConformanceSection.cs
- Code.cs
- NativeMethods.cs
- SinglePageViewer.cs
- DaylightTime.cs
- NamespaceDecl.cs
- counter.cs
- SmiSettersStream.cs
- DependencyStoreSurrogate.cs
- DefaultPrintController.cs
- StateInitializationDesigner.cs
- DateTimeOffsetStorage.cs
- FilterRepeater.cs
- SqlCacheDependencyDatabase.cs
- DecimalKeyFrameCollection.cs
- SessionStateContainer.cs
- RepeatButton.cs
- TrackingWorkflowEventArgs.cs
- HexParser.cs
- UpdateTracker.cs
- OperationParameterInfoCollection.cs
- AvTraceDetails.cs
- HeaderUtility.cs
- NaturalLanguageHyphenator.cs
- FieldNameLookup.cs