Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DistinctQueryOperator.cs / 1305376 / DistinctQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- LinqDataSource.cs
- DataGridAutoFormatDialog.cs
- HtmlInputText.cs
- BaseParagraph.cs
- AutoResetEvent.cs
- TemplateBindingExpression.cs
- UpdateManifestForBrowserApplication.cs
- SynchronizedMessageSource.cs
- EventLogPermissionEntryCollection.cs
- HMACSHA512.cs
- Speller.cs
- SqlStatistics.cs
- DataGridViewTopLeftHeaderCell.cs
- MediaCommands.cs
- Label.cs
- HostAdapter.cs
- RequiredFieldValidator.cs
- ToolStripItem.cs
- XmlElementList.cs
- SqlInternalConnectionTds.cs
- PackageProperties.cs
- WebPartTransformerCollection.cs
- SafeCertificateStore.cs
- SynchronizationLockException.cs
- serverconfig.cs
- XNameTypeConverter.cs
- DispatcherSynchronizationContext.cs
- XPathParser.cs
- ResourceDisplayNameAttribute.cs
- ObjectSpanRewriter.cs
- sqlpipe.cs
- HttpResponse.cs
- IxmlLineInfo.cs
- TableLayoutColumnStyleCollection.cs
- HttpCacheVaryByContentEncodings.cs
- FixedPosition.cs
- NavigationService.cs
- BaseDataList.cs
- DefaultDialogButtons.cs
- MetaModel.cs
- ContentElementAutomationPeer.cs
- ParentQuery.cs
- XamlLoadErrorInfo.cs
- DataControlPagerLinkButton.cs
- WsatTransactionInfo.cs
- PropertyAccessVisitor.cs
- COM2ColorConverter.cs
- SystemFonts.cs
- XmlValidatingReader.cs
- Suspend.cs
- XmlCharType.cs
- WebBrowserProgressChangedEventHandler.cs
- ExcludeFromCodeCoverageAttribute.cs
- FixedDocumentPaginator.cs
- FigureParaClient.cs
- GroupBoxAutomationPeer.cs
- CharEnumerator.cs
- _WebProxyDataBuilder.cs
- ImageMapEventArgs.cs
- CroppedBitmap.cs
- SQLConvert.cs
- BmpBitmapEncoder.cs
- XmlSchemaRedefine.cs
- DbDataAdapter.cs
- SafeReversePInvokeHandle.cs
- FormatException.cs
- DataViewSettingCollection.cs
- PrincipalPermission.cs
- MSAANativeProvider.cs
- UrlAuthorizationModule.cs
- SystemWebExtensionsSectionGroup.cs
- CLSCompliantAttribute.cs
- WS2007FederationHttpBindingElement.cs
- SettingsPropertyValue.cs
- EpmSourcePathSegment.cs
- StickyNoteContentControl.cs
- RepeaterCommandEventArgs.cs
- ReferenceService.cs
- TreeNodeSelectionProcessor.cs
- HttpProfileGroupBase.cs
- InputScopeManager.cs
- SHA256Managed.cs
- UnaryNode.cs
- LocalTransaction.cs
- DecimalAnimationBase.cs
- CheckBox.cs
- HtmlEmptyTagControlBuilder.cs
- CancelEventArgs.cs
- AQNBuilder.cs
- infer.cs
- PageAsyncTask.cs
- updatecommandorderer.cs
- DoubleAnimationBase.cs
- TitleStyle.cs
- DataGridViewRowPostPaintEventArgs.cs
- CqlGenerator.cs
- METAHEADER.cs
- NodeFunctions.cs
- safePerfProviderHandle.cs
- ZipIOExtraFieldZip64Element.cs