Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DistinctQueryOperator.cs / 1305376 / DistinctQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- KeyValuePairs.cs
- TagMapCollection.cs
- SiteMembershipCondition.cs
- EntityDesignPluralizationHandler.cs
- TlsnegoTokenAuthenticator.cs
- EntityDataSourceDesignerHelper.cs
- MsmqIntegrationSecurityMode.cs
- CounterSample.cs
- IconConverter.cs
- MailMessageEventArgs.cs
- ButtonColumn.cs
- IndexingContentUnit.cs
- ZoneButton.cs
- BitmapEffectDrawingContent.cs
- AlternateViewCollection.cs
- PenThreadWorker.cs
- ResolveRequestResponseAsyncResult.cs
- SiteMapPath.cs
- SecurityRuntime.cs
- TextEditorTables.cs
- DataGridViewRowHeightInfoPushedEventArgs.cs
- SqlNodeAnnotations.cs
- SignatureDescription.cs
- RangeValuePatternIdentifiers.cs
- MiniConstructorInfo.cs
- SuppressMessageAttribute.cs
- ScriptingRoleServiceSection.cs
- EntityCommand.cs
- XslCompiledTransform.cs
- X509Utils.cs
- MeshGeometry3D.cs
- MetabaseReader.cs
- EditorPart.cs
- TableCell.cs
- RepeaterCommandEventArgs.cs
- Style.cs
- ChangeBlockUndoRecord.cs
- DrawItemEvent.cs
- Win32SafeHandles.cs
- ProviderSettingsCollection.cs
- AttributeParameterInfo.cs
- ApplicationSecurityInfo.cs
- ParallelActivityDesigner.cs
- HttpPostedFileBase.cs
- TrimSurroundingWhitespaceAttribute.cs
- FormView.cs
- HostExecutionContextManager.cs
- MimeBasePart.cs
- CompositeFontInfo.cs
- InternalResources.cs
- LineServices.cs
- XPathNodePointer.cs
- ICspAsymmetricAlgorithm.cs
- RuleSetBrowserDialog.cs
- TrimSurroundingWhitespaceAttribute.cs
- FileDetails.cs
- Identity.cs
- ToolBarOverflowPanel.cs
- Rfc2898DeriveBytes.cs
- PtsContext.cs
- WebPartActionVerb.cs
- XmlSignatureManifest.cs
- SystemWebCachingSectionGroup.cs
- XmlSchemaComplexContentRestriction.cs
- QuestionEventArgs.cs
- XmlObjectSerializerReadContextComplex.cs
- SqlConnectionPoolProviderInfo.cs
- GridViewUpdatedEventArgs.cs
- XPathNodeList.cs
- RegexMatch.cs
- ResourceDictionary.cs
- ClientScriptItemCollection.cs
- InstanceStore.cs
- MasterPageBuildProvider.cs
- PagedDataSource.cs
- RichTextBoxConstants.cs
- WebPartConnection.cs
- HMACRIPEMD160.cs
- OdbcErrorCollection.cs
- CommandTreeTypeHelper.cs
- ElementInit.cs
- PropertyRef.cs
- WorkflowServiceNamespace.cs
- LogStream.cs
- TimeoutTimer.cs
- DefaultPrintController.cs
- BlurBitmapEffect.cs
- ComAdminWrapper.cs
- ILGenerator.cs
- CompiledIdentityConstraint.cs
- Queue.cs
- DecoderBestFitFallback.cs
- IntSecurity.cs
- CompositeActivityDesigner.cs
- WebScriptClientGenerator.cs
- RegionData.cs
- DataGridViewCellValidatingEventArgs.cs
- Choices.cs
- StylusEventArgs.cs
- HttpProtocolImporter.cs