Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DistinctQueryOperator.cs / 1305376 / DistinctQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Menu.cs
- ZipIOExtraFieldZip64Element.cs
- OperationDescription.cs
- DataGridViewCellStateChangedEventArgs.cs
- TrustLevel.cs
- ClientConfigurationHost.cs
- ColumnMapProcessor.cs
- IERequestCache.cs
- OleDbPropertySetGuid.cs
- HGlobalSafeHandle.cs
- HtmlToClrEventProxy.cs
- DataGridViewEditingControlShowingEventArgs.cs
- CLSCompliantAttribute.cs
- CodeIndexerExpression.cs
- XMLSyntaxException.cs
- ComponentCommands.cs
- XmlSchemaInfo.cs
- CallSiteHelpers.cs
- ControlHelper.cs
- DataAdapter.cs
- AttributeParameterInfo.cs
- DeclarativeCatalogPart.cs
- ThreadStaticAttribute.cs
- StartUpEventArgs.cs
- EnumerableCollectionView.cs
- PtsCache.cs
- HttpCacheParams.cs
- ConnectionsZone.cs
- odbcmetadatacollectionnames.cs
- __ConsoleStream.cs
- Document.cs
- WorkflowInstanceProxy.cs
- ErrorView.xaml.cs
- ChtmlPageAdapter.cs
- Stack.cs
- TraceSection.cs
- SplashScreen.cs
- ReplyChannelAcceptor.cs
- GB18030Encoding.cs
- NodeLabelEditEvent.cs
- TailPinnedEventArgs.cs
- GridToolTip.cs
- PaperSource.cs
- XmlNodeReader.cs
- ResolvedKeyFrameEntry.cs
- SQLString.cs
- RuntimeHelpers.cs
- Vector3DCollection.cs
- ScopelessEnumAttribute.cs
- LazyTextWriterCreator.cs
- Geometry3D.cs
- ProfileGroupSettings.cs
- SimpleApplicationHost.cs
- SystemParameters.cs
- CorrelationResolver.cs
- RequiredArgumentAttribute.cs
- ControlUtil.cs
- LayoutEditorPart.cs
- DefinitionBase.cs
- ExpressionConverter.cs
- ResXDataNode.cs
- InputChannel.cs
- Hashtable.cs
- EtwTrackingBehaviorElement.cs
- BasicExpressionVisitor.cs
- NTAccount.cs
- DataTableMapping.cs
- WebReferencesBuildProvider.cs
- DbCommandTree.cs
- XmlSchemaSimpleType.cs
- UnsupportedPolicyOptionsException.cs
- NegotiateStream.cs
- TemplateNameScope.cs
- ChildrenQuery.cs
- IdentityValidationException.cs
- CompareValidator.cs
- Encoder.cs
- XmlSchemaAnnotated.cs
- SmtpDateTime.cs
- StringCollection.cs
- FlowPosition.cs
- GorillaCodec.cs
- ConditionalWeakTable.cs
- AudioLevelUpdatedEventArgs.cs
- RedirectionProxy.cs
- GeometryValueSerializer.cs
- KnownBoxes.cs
- TextRunCache.cs
- UpdateCommand.cs
- RequestQueue.cs
- ReadOnlyCollectionBase.cs
- AttributeCollection.cs
- EventRouteFactory.cs
- ReadOnlyCollection.cs
- SafeSecurityHandles.cs
- X509ServiceCertificateAuthentication.cs
- ExecutionEngineException.cs
- AutomationPatternInfo.cs
- DbXmlEnabledProviderManifest.cs
- dataSvcMapFileLoader.cs