Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs / 1305376 / SelectManyQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SelectManyQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// SelectMany is effectively a nested loops join. It is given two data sources, an /// outer and an inner -- actually, the inner is sometimes calculated by invoking a /// function for each outer element -- and we walk the outer, walking the entire /// inner enumerator for each outer element. There is an optional result selector /// function which can transform the output before yielding it as a result element. /// /// Notes: /// Although select many takes two enumerable objects as input, it appears to the /// query analysis infrastructure as a unary operator. That's because it works a /// little differently than the other binary operators: it has to re-open the right /// child every time an outer element is walked. The right child is NOT partitioned. /// ////// /// internal sealed class SelectManyQueryOperator : UnaryQueryOperator { private readonly Func > m_rightChildSelector; // To select a new child each iteration. private readonly Func > m_indexedRightChildSelector; // To select a new child each iteration. private readonly Func m_resultSelector; // An optional result selection function. private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new select-many operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // rightChildSelector - if no right data source was supplied, the selector function // will generate a new right child for every unique left element. // resultSelector - a selection function for creating output elements. // internal SelectManyQueryOperator(IEnumerable leftChild, Func > rightChildSelector, Func > indexedRightChildSelector, Func resultSelector) :base(leftChild) { Contract.Assert(leftChild != null, "left child data source cannot be null"); Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null, "either right child data or selector must be supplied"); Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null, "either indexed- or non-indexed child selector must be supplied (not both)"); Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null, "right input and output must be the same types, otherwise the result selector may not be null"); m_rightChildSelector = rightChildSelector; m_indexedRightChildSelector = indexedRightChildSelector; m_resultSelector = resultSelector; // If the SelectMany is indexed, elements must be returned in the order in which // indices were assigned. m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null; InitOrderIndex(); } private void InitOrderIndex() { if (m_indexedRightChildSelector != null) { // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them // into the user delegate. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct); } else { if (OutputOrdered) { // If the output of this SelectMany is ordered, the input keys must be at least increasing. The // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys // are Shuffled, we need to merge prematurely. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing); } } SetOrdinalIndexState(OrdinalIndexState.Shuffled); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; if (m_indexedRightChildSelector != null) { PartitionedStream inputStreamInt; // If the index is not correct, we need to reindex. if (m_prematureMerge) { ListQueryResults listResults = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings); inputStreamInt = listResults.GetPartitionedStream(); } else { inputStreamInt = (PartitionedStream )(object)inputStream; } WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings); return; } // // if (m_prematureMerge) { PartitionedStream inputStreamInt = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings) .GetPartitionedStream(); WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings); } else { WrapPartitionedStreamNotIndexed(inputStream, recipient, settings); } } /// /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, /// it will be the same type as "TLeftKey" in WrapPartitionedStream.) /// private void WrapPartitionedStreamNotIndexed( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(partitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new SelectManyQueryOperatorEnumerator (inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } /// /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate) /// private void WrapPartitionedStreamIndexed( PartitionedStreaminputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(inputStream.PartitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < inputStream.PartitionCount; i++) { outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the left child and wrapping with a // partition if needed. The right child is not opened yet -- this is always done on demand // as the outer elements are enumerated. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_rightChildSelector != null) { if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector)); } else { Contract.Assert(m_indexedRightChildSelector != null); if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector)); } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMerge; } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal int m_currentLeftSourceIndex; // The current key in the left data source. internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal TLeftKey m_currentLeftKey; // The current key in the left data source. internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SupportingTokenSpecification.cs
- CorrelationExtension.cs
- NullExtension.cs
- EventLog.cs
- XmlAnyAttributeAttribute.cs
- LocalizationComments.cs
- TemplateAction.cs
- UrlMapping.cs
- DashStyles.cs
- WebScriptMetadataFormatter.cs
- PasswordRecoveryDesigner.cs
- UriTemplateLiteralQueryValue.cs
- Filter.cs
- PreviewControlDesigner.cs
- Variable.cs
- XmlCharCheckingReader.cs
- GenericWebPart.cs
- ChangePasswordDesigner.cs
- GeometryDrawing.cs
- GeneralTransformGroup.cs
- XmlTypeMapping.cs
- EditorReuseAttribute.cs
- Rotation3DAnimationBase.cs
- Annotation.cs
- StringComparer.cs
- OrderedEnumerableRowCollection.cs
- WebBrowserHelper.cs
- OrderingExpression.cs
- StrokeNodeData.cs
- QueueProcessor.cs
- RNGCryptoServiceProvider.cs
- Enum.cs
- dsa.cs
- _NestedSingleAsyncResult.cs
- XmlnsDictionary.cs
- ConfigXmlReader.cs
- FixedSOMElement.cs
- Rotation3DKeyFrameCollection.cs
- ServiceKnownTypeAttribute.cs
- OneOfScalarConst.cs
- NativeCppClassAttribute.cs
- ParameterModifier.cs
- WindowClosedEventArgs.cs
- XmlObjectSerializerReadContext.cs
- GeneralTransform.cs
- GroupBoxAutomationPeer.cs
- SemanticBasicElement.cs
- NoPersistProperty.cs
- ObjectListComponentEditor.cs
- ReadOnlyPropertyMetadata.cs
- latinshape.cs
- SecureConversationServiceCredential.cs
- CollectionAdapters.cs
- EdmComplexTypeAttribute.cs
- PerformanceCounterPermission.cs
- CompositeDataBoundControl.cs
- ThicknessAnimation.cs
- LogArchiveSnapshot.cs
- prefixendpointaddressmessagefilter.cs
- FontStyleConverter.cs
- HttpResponseHeader.cs
- AspProxy.cs
- loginstatus.cs
- ElementsClipboardData.cs
- ContainsSearchOperator.cs
- CompositeCollection.cs
- TypeToken.cs
- DependentList.cs
- TimeSpanOrInfiniteConverter.cs
- DesignTimeData.cs
- DrawingVisual.cs
- XDRSchema.cs
- SpeechDetectedEventArgs.cs
- StreamGeometryContext.cs
- BaseAutoFormat.cs
- ContainerParaClient.cs
- TreeSet.cs
- Menu.cs
- CallbackValidator.cs
- ContextMarshalException.cs
- DbDeleteCommandTree.cs
- FolderBrowserDialog.cs
- IISMapPath.cs
- ThemeDirectoryCompiler.cs
- ItemType.cs
- StoreAnnotationsMap.cs
- MouseGesture.cs
- ProfileSection.cs
- PropertyMappingExceptionEventArgs.cs
- ScriptingRoleServiceSection.cs
- Tokenizer.cs
- contentDescriptor.cs
- HttpServerVarsCollection.cs
- NavigationHelper.cs
- StreamSecurityUpgradeInitiator.cs
- EnumerableRowCollectionExtensions.cs
- DataControlPagerLinkButton.cs
- Line.cs
- DataSourceXmlSerializer.cs
- ResourcesBuildProvider.cs