Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / UnionQueryOperator.cs / 1305376 / UnionQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // UnionQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// Operator that yields the union of two data sources. /// ///internal sealed class UnionQueryOperator : BinaryQueryOperator { private readonly IEqualityComparer m_comparer; // An equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new union operator. // internal UnionQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer) :base(left, right) { Contract.Assert(left != null && right != null, "child data sources cannot be null"); m_comparer = comparer; m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered; } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults leftChildResults = LeftChild.Open(settings, false); QueryResults rightChildResults = RightChild.Open(settings, false); return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); } public override void WrapPartitionedStream ( PartitionedStream leftStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings) { Contract.Assert(leftStream.PartitionCount == rightStream.PartitionCount); int partitionCount = leftStream.PartitionCount; // Wrap both child streams with hash repartition if (LeftChild.OutputOrdered) { PartitionedStream , TLeftKey> leftHashStream = ExchangeUtilities.HashRepartitionOrdered ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } else { PartitionedStream , int> leftHashStream = ExchangeUtilities.HashRepartition ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStream to fix the TLeftKey type parameter. // private void WrapPartitionedStreamFixedLeftType ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (RightChild.OutputOrdered) { PartitionedStream , TRightKey> rightHashStream = ExchangeUtilities.HashRepartitionOrdered ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } else { PartitionedStream , int> rightHashStream = ExchangeUtilities.HashRepartition ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStreamHelper to fix the TRightKey type parameter. // private void WrapPartitionedStreamFixedBothTypes ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream , TRightKey> rightHashStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (LeftChild.OutputOrdered || RightChild.OutputOrdered) { IComparer > compoundKeyComparer = ConcatKey .MakeComparer(leftHashStream.KeyComparer, rightHashStream.KeyComparer); PartitionedStream > outputStream = new PartitionedStream >(partitionCount, compoundKeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new OrderedUnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], LeftChild.OutputOrdered, RightChild.OutputOrdered, m_comparer, compoundKeyComparer, cancellationToken); } outputRecipient.Receive(outputStream); } else { PartitionedStream outputStream = new PartitionedStream (partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new UnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], i, m_comparer, cancellationToken); } outputRecipient.Receive(outputStream); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); return wrappedLeftChild.Union(wrappedRightChild, m_comparer); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the union operation incrementally. It does this by maintaining // a history -- in the form of a set -- of all data already seen. It is careful not to // return any duplicates. // class UnionQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private readonly int m_partitionIndex; // The current partition. private Set m_hashLookup; // The hash lookup, used to produce the union. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; private readonly IEqualityComparer m_comparer; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal UnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, int partitionIndex, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_partitionIndex = partitionIndex; // @ m_comparer = comparer; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { if (m_hashLookup == null) { m_hashLookup = new Set (m_comparer); m_outputLoopCount = new Shared (0); } Contract.Assert(m_hashLookup != null); // Enumerate the left and then right data source. When each is done, we set the // field to null so we will skip it upon subsequent calls to MoveNext. if (m_leftSource != null) { // Iterate over this set's elements until we find a unique element. TLeftKey keyUnused = default(TLeftKey); Pair currentLeftElement = default(Pair ); int i = 0; while (m_leftSource.MoveNext(ref currentLeftElement, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentLeftElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentLeftElement.First; return true; } } m_leftSource.Dispose(); m_leftSource = null; } if (m_rightSource != null) { // Iterate over this set's elements until we find a unique element. TRightKey keyUnused = default(TRightKey); Pair currentRightElement = default(Pair ); while (m_rightSource.MoveNext(ref currentRightElement, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentRightElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentRightElement.First; return true; } } m_rightSource.Dispose(); m_rightSource = null; } return false; } protected override void Dispose(bool disposing) { if (m_leftSource != null) { m_leftSource.Dispose(); } if (m_rightSource != null) { m_rightSource.Dispose(); } } } class OrderedUnionQueryOperatorEnumerator : QueryOperatorEnumerator > { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private IComparer > m_keyComparer; // Comparer for compound order keys. private IEnumerator , Pair >>> m_outputEnumerator; // Enumerator over the output of the union. private bool m_leftOrdered; // Whether the left data source is ordered. private bool m_rightOrdered; // Whether the right data source is ordered. private IEqualityComparer m_comparer; // Comparer for the elements. private CancellationToken m_cancellationToken; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal OrderedUnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, bool leftOrdered, bool rightOrdered, IEqualityComparer comparer, IComparer > keyComparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_keyComparer = keyComparer; m_leftOrdered = leftOrdered; m_rightOrdered = rightOrdered; m_comparer = comparer; if (m_comparer == null) { m_comparer = EqualityComparer .Default; } m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref ConcatKey currentKey) { Contract.Assert(m_leftSource != null); Contract.Assert(m_rightSource != null); if (m_outputEnumerator == null) { IEqualityComparer > wrapperComparer = new WrapperEqualityComparer (m_comparer); Dictionary , Pair >> union = new Dictionary , Pair >>(wrapperComparer); Pair elem = default(Pair ); TLeftKey leftKey = default(TLeftKey); int i = 0; while (m_leftSource.MoveNext(ref elem, ref leftKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeLeft(m_leftOrdered ? leftKey : default(TLeftKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); } } TRightKey rightKey = default(TRightKey); while (m_rightSource.MoveNext(ref elem, ref rightKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeRight(m_rightOrdered ? rightKey : default(TRightKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); ; } } m_outputEnumerator = union.GetEnumerator(); } if (m_outputEnumerator.MoveNext()) { Pair > current = m_outputEnumerator.Current.Value; currentElement = current.First; currentKey = current.Second; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_leftSource != null && m_rightSource != null); m_leftSource.Dispose(); m_rightSource.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- PartialList.cs
- WebPartConnectionsCancelEventArgs.cs
- DataBoundControl.cs
- QilInvokeLateBound.cs
- WindowsScrollBarBits.cs
- ConvertTextFrag.cs
- CellParagraph.cs
- MetadataItemEmitter.cs
- SerializationFieldInfo.cs
- SynchronousChannel.cs
- WebPartConnectionsDisconnectVerb.cs
- SizeAnimation.cs
- FormsIdentity.cs
- LayoutUtils.cs
- ComContractElement.cs
- XmlNodeReader.cs
- DynamicValidatorEventArgs.cs
- DataGridViewRowPostPaintEventArgs.cs
- RepeatBehavior.cs
- RegexCharClass.cs
- DataColumnMappingCollection.cs
- SchemaMerger.cs
- XmlElementAttributes.cs
- ObjectSpanRewriter.cs
- DayRenderEvent.cs
- DataBoundControl.cs
- RepeaterItem.cs
- OptimalBreakSession.cs
- ResXDataNode.cs
- Run.cs
- SelfIssuedAuthRSAPKCS1SignatureFormatter.cs
- DBAsyncResult.cs
- UrlMapping.cs
- ConsumerConnectionPointCollection.cs
- XmlNamespaceDeclarationsAttribute.cs
- PrintDialog.cs
- AlternateView.cs
- MulticastDelegate.cs
- DoubleLinkListEnumerator.cs
- UnsafeNativeMethods.cs
- SchemaEntity.cs
- ProxyAttribute.cs
- TextInfo.cs
- MarkedHighlightComponent.cs
- HttpProfileGroupBase.cs
- LayoutEvent.cs
- WebConfigurationHostFileChange.cs
- arclist.cs
- ToolBarTray.cs
- ButtonChrome.cs
- OleDbEnumerator.cs
- IndexedString.cs
- SafeSecurityHandles.cs
- DBPropSet.cs
- FormViewUpdatedEventArgs.cs
- AnnotationDocumentPaginator.cs
- JavaScriptObjectDeserializer.cs
- Wildcard.cs
- TagMapCollection.cs
- DeobfuscatingStream.cs
- PowerEase.cs
- SimpleApplicationHost.cs
- TextBlockAutomationPeer.cs
- DebugView.cs
- ComponentRenameEvent.cs
- MouseButtonEventArgs.cs
- HttpContextWrapper.cs
- CmsInterop.cs
- HtmlTableCell.cs
- RawStylusInputCustomData.cs
- GACMembershipCondition.cs
- ArrayConverter.cs
- FileDialog_Vista.cs
- NativeMethods.cs
- UIElement.cs
- NavigationExpr.cs
- HyperLink.cs
- SoapSchemaExporter.cs
- HttpPostedFile.cs
- AgileSafeNativeMemoryHandle.cs
- BaseComponentEditor.cs
- RedistVersionInfo.cs
- Binding.cs
- InternalRelationshipCollection.cs
- BypassElementCollection.cs
- ValuePattern.cs
- ControlCollection.cs
- XmlName.cs
- ApplicationSettingsBase.cs
- DocumentCollection.cs
- UnsafeNativeMethods.cs
- NonVisualControlAttribute.cs
- DataObjectAttribute.cs
- BitmapEffectDrawingContextWalker.cs
- SimpleNameService.cs
- SecurityCriticalDataForSet.cs
- DictionaryBase.cs
- TextSyndicationContent.cs
- SystemResourceHost.cs
- WriteableBitmap.cs