Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / SynchronousChannelMergeEnumerator.cs / 1305376 / SynchronousChannelMergeEnumerator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SynchronousChannelMergeEnumerator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// This enumerator merges multiple input channels into a single output stream. The merging process just /// goes from left-to-right, enumerating each channel in succession in its entirety. /// Assumptions: /// Before enumerating this object, all producers for all channels must have finished enqueueing new /// elements. /// ///internal sealed class SynchronousChannelMergeEnumerator : MergeEnumerator { private SynchronousChannel [] m_channels; // The channel array we will enumerate, from left-to-right. private int m_channelIndex; // The current channel index. This moves through the array as we enumerate. private T m_currentElement; // The last element remembered during enumeration. //------------------------------------------------------------------------------------ // Instantiates a new enumerator for a set of channels. // internal SynchronousChannelMergeEnumerator( QueryTaskGroupState taskGroupState, SynchronousChannel [] channels) : base(taskGroupState) { Contract.Assert(channels != null); #if DEBUG foreach (SynchronousChannel c in channels) Contract.Assert(c != null); #endif m_channels = channels; m_channelIndex = -1; } //----------------------------------------------------------------------------------- // Retrieves the current element. // // Notes: // This throws if we haven't begun enumerating or have gone past the end of the // data source. // public override T Current { get { // If we're at the beginning or the end of the array, it's invalid to be // retrieving the current element. We throw. if (m_channelIndex == -1 || m_channelIndex == m_channels.Length) { throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); } return m_currentElement; } } //----------------------------------------------------------------------------------- // Positions the enumerator over the next element. This includes merging as we // enumerate, by just incrementing indexes, etc. // // Return Value: // True if there's a current element, false if we've reached the end. // public override bool MoveNext() { Contract.Assert(m_channels != null); // If we're at the start, initialize the index. if (m_channelIndex == -1) { m_channelIndex = 0; } // If the index has reached the end, we bail. while (m_channelIndex != m_channels.Length) { SynchronousChannel current = m_channels[m_channelIndex]; Contract.Assert(current != null); if (current.Count == 0) { // We're done with this channel, move on to the next one. We don't // have to check that it's "done" since this is a synchronous consumer. m_channelIndex++; } else { // Remember the "current" element and return. m_currentElement = current.Dequeue(); return true; } } TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks); // If we got this far, it means we've exhausted our channels. Contract.Assert(m_channelIndex == m_channels.Length); return false; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SynchronousChannelMergeEnumerator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// This enumerator merges multiple input channels into a single output stream. The merging process just /// goes from left-to-right, enumerating each channel in succession in its entirety. /// Assumptions: /// Before enumerating this object, all producers for all channels must have finished enqueueing new /// elements. /// ///internal sealed class SynchronousChannelMergeEnumerator : MergeEnumerator { private SynchronousChannel [] m_channels; // The channel array we will enumerate, from left-to-right. private int m_channelIndex; // The current channel index. This moves through the array as we enumerate. private T m_currentElement; // The last element remembered during enumeration. //------------------------------------------------------------------------------------ // Instantiates a new enumerator for a set of channels. // internal SynchronousChannelMergeEnumerator( QueryTaskGroupState taskGroupState, SynchronousChannel [] channels) : base(taskGroupState) { Contract.Assert(channels != null); #if DEBUG foreach (SynchronousChannel c in channels) Contract.Assert(c != null); #endif m_channels = channels; m_channelIndex = -1; } //----------------------------------------------------------------------------------- // Retrieves the current element. // // Notes: // This throws if we haven't begun enumerating or have gone past the end of the // data source. // public override T Current { get { // If we're at the beginning or the end of the array, it's invalid to be // retrieving the current element. We throw. if (m_channelIndex == -1 || m_channelIndex == m_channels.Length) { throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); } return m_currentElement; } } //----------------------------------------------------------------------------------- // Positions the enumerator over the next element. This includes merging as we // enumerate, by just incrementing indexes, etc. // // Return Value: // True if there's a current element, false if we've reached the end. // public override bool MoveNext() { Contract.Assert(m_channels != null); // If we're at the start, initialize the index. if (m_channelIndex == -1) { m_channelIndex = 0; } // If the index has reached the end, we bail. while (m_channelIndex != m_channels.Length) { SynchronousChannel current = m_channels[m_channelIndex]; Contract.Assert(current != null); if (current.Count == 0) { // We're done with this channel, move on to the next one. We don't // have to check that it's "done" since this is a synchronous consumer. m_channelIndex++; } else { // Remember the "current" element and return. m_currentElement = current.Dequeue(); return true; } } TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks); // If we got this far, it means we've exhausted our channels. Contract.Assert(m_channelIndex == m_channels.Length); return false; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TransactionalPackage.cs
- ListItem.cs
- LZCodec.cs
- PackageStore.cs
- EventData.cs
- BindingOperations.cs
- WebPartMinimizeVerb.cs
- ParameterElementCollection.cs
- BindingMAnagerBase.cs
- TypeConstant.cs
- ProcessModelSection.cs
- MsmqSecureHashAlgorithm.cs
- TagNameToTypeMapper.cs
- TabControl.cs
- MobileDeviceCapabilitiesSectionHandler.cs
- _FtpControlStream.cs
- UrlPath.cs
- TerminateWorkflow.cs
- SqlHelper.cs
- Control.cs
- BitmapSource.cs
- XmlBindingWorker.cs
- OperationInfoBase.cs
- _NestedSingleAsyncResult.cs
- RegisteredArrayDeclaration.cs
- XamlClipboardData.cs
- ConsoleEntryPoint.cs
- Int32CollectionValueSerializer.cs
- ManagementOptions.cs
- ReflectionHelper.cs
- HTMLTextWriter.cs
- Attachment.cs
- AesCryptoServiceProvider.cs
- Int16Animation.cs
- ChangeInterceptorAttribute.cs
- MetabaseServerConfig.cs
- WindowShowOrOpenTracker.cs
- ResourceDictionary.cs
- FocusWithinProperty.cs
- SQLGuid.cs
- SafeEventHandle.cs
- EntityDataSourceWrapperCollection.cs
- DeviceSpecificDialogCachedState.cs
- PersonalizationProviderCollection.cs
- MembershipValidatePasswordEventArgs.cs
- ParseNumbers.cs
- ServiceObjectContainer.cs
- Preprocessor.cs
- InternalSafeNativeMethods.cs
- CfgParser.cs
- __Error.cs
- _Win32.cs
- InternalCache.cs
- ComplexTypeEmitter.cs
- DataControlCommands.cs
- StandardBindingElementCollection.cs
- KeyboardDevice.cs
- Simplifier.cs
- RoleServiceManager.cs
- SpeechSynthesizer.cs
- ConfigsHelper.cs
- HandlerBase.cs
- ReadOnlyPropertyMetadata.cs
- BitmapEffectInput.cs
- RootBrowserWindowAutomationPeer.cs
- ColorTransform.cs
- ManualResetEvent.cs
- GroupDescription.cs
- Comparer.cs
- Perspective.cs
- ObservableDictionary.cs
- HostingPreferredMapPath.cs
- TextDecorationUnitValidation.cs
- AutomationElement.cs
- UTF7Encoding.cs
- DocumentSequence.cs
- TextContainerChangeEventArgs.cs
- PropertyValue.cs
- UInt64.cs
- DateTimeUtil.cs
- PerformanceCounter.cs
- ValueQuery.cs
- RegexStringValidator.cs
- WmlFormAdapter.cs
- System.Data.OracleClient_BID.cs
- GeometryHitTestResult.cs
- AuthorizationRuleCollection.cs
- RenderingBiasValidation.cs
- ErrorTableItemStyle.cs
- OrderPreservingPipeliningSpoolingTask.cs
- FontFamilyValueSerializer.cs
- UInt16.cs
- FocusManager.cs
- TraceSource.cs
- MemberDomainMap.cs
- ActivityExecutorDelegateInfo.cs
- VisualStates.cs
- CodeRemoveEventStatement.cs
- ErrorTableItemStyle.cs
- UnsafeNativeMethods.cs