Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / UnionQueryOperator.cs / 1305376 / UnionQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // UnionQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// Operator that yields the union of two data sources. /// ///internal sealed class UnionQueryOperator : BinaryQueryOperator { private readonly IEqualityComparer m_comparer; // An equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new union operator. // internal UnionQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer) :base(left, right) { Contract.Assert(left != null && right != null, "child data sources cannot be null"); m_comparer = comparer; m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered; } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults leftChildResults = LeftChild.Open(settings, false); QueryResults rightChildResults = RightChild.Open(settings, false); return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); } public override void WrapPartitionedStream ( PartitionedStream leftStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings) { Contract.Assert(leftStream.PartitionCount == rightStream.PartitionCount); int partitionCount = leftStream.PartitionCount; // Wrap both child streams with hash repartition if (LeftChild.OutputOrdered) { PartitionedStream , TLeftKey> leftHashStream = ExchangeUtilities.HashRepartitionOrdered ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } else { PartitionedStream , int> leftHashStream = ExchangeUtilities.HashRepartition ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStream to fix the TLeftKey type parameter. // private void WrapPartitionedStreamFixedLeftType ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (RightChild.OutputOrdered) { PartitionedStream , TRightKey> rightHashStream = ExchangeUtilities.HashRepartitionOrdered ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } else { PartitionedStream , int> rightHashStream = ExchangeUtilities.HashRepartition ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStreamHelper to fix the TRightKey type parameter. // private void WrapPartitionedStreamFixedBothTypes ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream , TRightKey> rightHashStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (LeftChild.OutputOrdered || RightChild.OutputOrdered) { IComparer > compoundKeyComparer = ConcatKey .MakeComparer(leftHashStream.KeyComparer, rightHashStream.KeyComparer); PartitionedStream > outputStream = new PartitionedStream >(partitionCount, compoundKeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new OrderedUnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], LeftChild.OutputOrdered, RightChild.OutputOrdered, m_comparer, compoundKeyComparer, cancellationToken); } outputRecipient.Receive(outputStream); } else { PartitionedStream outputStream = new PartitionedStream (partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new UnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], i, m_comparer, cancellationToken); } outputRecipient.Receive(outputStream); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); return wrappedLeftChild.Union(wrappedRightChild, m_comparer); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the union operation incrementally. It does this by maintaining // a history -- in the form of a set -- of all data already seen. It is careful not to // return any duplicates. // class UnionQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private readonly int m_partitionIndex; // The current partition. private Set m_hashLookup; // The hash lookup, used to produce the union. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; private readonly IEqualityComparer m_comparer; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal UnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, int partitionIndex, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_partitionIndex = partitionIndex; // @ m_comparer = comparer; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { if (m_hashLookup == null) { m_hashLookup = new Set (m_comparer); m_outputLoopCount = new Shared (0); } Contract.Assert(m_hashLookup != null); // Enumerate the left and then right data source. When each is done, we set the // field to null so we will skip it upon subsequent calls to MoveNext. if (m_leftSource != null) { // Iterate over this set's elements until we find a unique element. TLeftKey keyUnused = default(TLeftKey); Pair currentLeftElement = default(Pair ); int i = 0; while (m_leftSource.MoveNext(ref currentLeftElement, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentLeftElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentLeftElement.First; return true; } } m_leftSource.Dispose(); m_leftSource = null; } if (m_rightSource != null) { // Iterate over this set's elements until we find a unique element. TRightKey keyUnused = default(TRightKey); Pair currentRightElement = default(Pair ); while (m_rightSource.MoveNext(ref currentRightElement, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentRightElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentRightElement.First; return true; } } m_rightSource.Dispose(); m_rightSource = null; } return false; } protected override void Dispose(bool disposing) { if (m_leftSource != null) { m_leftSource.Dispose(); } if (m_rightSource != null) { m_rightSource.Dispose(); } } } class OrderedUnionQueryOperatorEnumerator : QueryOperatorEnumerator > { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private IComparer > m_keyComparer; // Comparer for compound order keys. private IEnumerator , Pair >>> m_outputEnumerator; // Enumerator over the output of the union. private bool m_leftOrdered; // Whether the left data source is ordered. private bool m_rightOrdered; // Whether the right data source is ordered. private IEqualityComparer m_comparer; // Comparer for the elements. private CancellationToken m_cancellationToken; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal OrderedUnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, bool leftOrdered, bool rightOrdered, IEqualityComparer comparer, IComparer > keyComparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_keyComparer = keyComparer; m_leftOrdered = leftOrdered; m_rightOrdered = rightOrdered; m_comparer = comparer; if (m_comparer == null) { m_comparer = EqualityComparer .Default; } m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref ConcatKey currentKey) { Contract.Assert(m_leftSource != null); Contract.Assert(m_rightSource != null); if (m_outputEnumerator == null) { IEqualityComparer > wrapperComparer = new WrapperEqualityComparer (m_comparer); Dictionary , Pair >> union = new Dictionary , Pair >>(wrapperComparer); Pair elem = default(Pair ); TLeftKey leftKey = default(TLeftKey); int i = 0; while (m_leftSource.MoveNext(ref elem, ref leftKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeLeft(m_leftOrdered ? leftKey : default(TLeftKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); } } TRightKey rightKey = default(TRightKey); while (m_rightSource.MoveNext(ref elem, ref rightKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeRight(m_rightOrdered ? rightKey : default(TRightKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); ; } } m_outputEnumerator = union.GetEnumerator(); } if (m_outputEnumerator.MoveNext()) { Pair > current = m_outputEnumerator.Current.Value; currentElement = current.First; currentKey = current.Second; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_leftSource != null && m_rightSource != null); m_leftSource.Dispose(); m_rightSource.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ImageList.cs
- Rectangle.cs
- UnaryExpression.cs
- BaseInfoTable.cs
- PingOptions.cs
- DbConnectionPool.cs
- CriticalHandle.cs
- AppDomainFactory.cs
- TextBoxBase.cs
- StaticDataManager.cs
- PanelStyle.cs
- OutputCacheSettingsSection.cs
- LinkLabel.cs
- _LazyAsyncResult.cs
- RadialGradientBrush.cs
- SendOperation.cs
- Random.cs
- XMLSchema.cs
- HttpModuleAction.cs
- BindingExpressionBase.cs
- LocalizationCodeDomSerializer.cs
- WindowsPrincipal.cs
- FilterQueryOptionExpression.cs
- WebExceptionStatus.cs
- ViewPort3D.cs
- BoolExpression.cs
- HtmlGenericControl.cs
- ObjectHelper.cs
- MultiView.cs
- DataGridViewSelectedRowCollection.cs
- DefaultPropertyAttribute.cs
- TemplateControlParser.cs
- WebDisplayNameAttribute.cs
- XsdBuilder.cs
- DependencyProperty.cs
- CodeSnippetCompileUnit.cs
- GradientBrush.cs
- _IPv6Address.cs
- LocalBuilder.cs
- OrCondition.cs
- StringAttributeCollection.cs
- SafeNativeMethods.cs
- ToolStripItem.cs
- RegistryPermission.cs
- KeysConverter.cs
- IndexOutOfRangeException.cs
- HTMLTextWriter.cs
- PersistenceContext.cs
- WaitHandleCannotBeOpenedException.cs
- TreeNode.cs
- PeerOutputChannel.cs
- NativeWindow.cs
- SessionStateSection.cs
- DataTableNameHandler.cs
- WebBrowserPermission.cs
- DataGridLinkButton.cs
- RepeatButtonAutomationPeer.cs
- DataBoundControlAdapter.cs
- MappedMetaModel.cs
- DBConnectionString.cs
- MetadataProperty.cs
- MulticastOption.cs
- MemoryRecordBuffer.cs
- URIFormatException.cs
- SiteMembershipCondition.cs
- PrivateUnsafeNativeCompoundFileMethods.cs
- HttpException.cs
- XMLSchema.cs
- DataGridLinkButton.cs
- QuaternionAnimationBase.cs
- TemplateBindingExtensionConverter.cs
- FontCacheLogic.cs
- WindowsIdentity.cs
- KeyTime.cs
- CodeDomLocalizationProvider.cs
- NullableFloatAverageAggregationOperator.cs
- DataGridViewSelectedCellCollection.cs
- HttpProtocolImporter.cs
- ImageButton.cs
- IdentitySection.cs
- PeerNearMe.cs
- UnmanagedMemoryStreamWrapper.cs
- StorageFunctionMapping.cs
- MouseCaptureWithinProperty.cs
- SqlDataSourceDesigner.cs
- AtomParser.cs
- CompilerResults.cs
- TypeForwardedFromAttribute.cs
- SmtpSpecifiedPickupDirectoryElement.cs
- LinkConverter.cs
- TimeoutException.cs
- UnsafeNativeMethods.cs
- ResourcePermissionBaseEntry.cs
- LayoutEngine.cs
- ListSortDescriptionCollection.cs
- _AutoWebProxyScriptEngine.cs
- WebPartHeaderCloseVerb.cs
- ExpressionBindingCollection.cs
- SessionStateUtil.cs
- PreProcessor.cs