Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / AsynchronousChannelMergeEnumerator.cs / 1305376 / AsynchronousChannelMergeEnumerator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // AsynchronousChannelMergeEnumerator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Threading; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// An enumerator that merges multiple one-to-one channels into a single output /// stream, including any necessary blocking and synchronization. This is an /// asynchronous enumerator, i.e. the producers may be inserting items into the /// channels concurrently with the consumer taking items out of them. Therefore, /// enumerating this object can cause the current thread to block. /// /// We use a biased choice algorithm to choose from our consumer channels. I.e. we /// will prefer to process elements in a fair round-robin fashion, but will /// occ----ionally bypass this if a channel is empty. /// /// ///internal sealed class AsynchronousChannelMergeEnumerator : MergeEnumerator { private AsynchronousChannel [] m_channels; // The channels being enumerated. private ManualResetEventSlim[] m_channelEvents; // Events for the channels being enumerated. private bool[] m_done; // Tracks which channels are done. private int m_channelIndex; // The next channel from which we'll dequeue. private T m_currentElement; // The remembered element from the previous MoveNext. //------------------------------------------------------------------------------------ // Allocates a new enumerator over a set of one-to-one channels. // internal AsynchronousChannelMergeEnumerator( QueryTaskGroupState taskGroupState, AsynchronousChannel [] channels) : base(taskGroupState) { Contract.Assert(channels != null); #if DEBUG foreach (AsynchronousChannel c in channels) Contract.Assert(c != null); #endif m_channels = channels; m_channelIndex = -1; // To catch calls to Current before MoveNext. m_done = new bool[m_channels.Length]; // Initialized to { false }, i.e. no channels done. } //----------------------------------------------------------------------------------- // Retrieves the current element. // // Notes: // This throws if we haven't begun enumerating or have gone past the end of the // data source. // public override T Current { get { if (m_channelIndex == -1 || m_channelIndex == m_channels.Length) { throw new InvalidOperationException(SR.GetString(SR.PLINQ_CommonEnumerator_Current_NotStarted)); } return m_currentElement; } } //----------------------------------------------------------------------------------- // Positions the enumerator over the next element. This includes merging as we // enumerate, which may also involve waiting for producers to generate new items. // // Return Value: // True if there's a current element, false if we've reached the end. // public override bool MoveNext() { // On the first call to MoveNext, we advance the position to a real channel. int index = m_channelIndex; if (index == -1) { m_channelIndex = index = 0; } // If we're past the end, enumeration is done. if (index == m_channels.Length) { return false; } // Else try the fast path. if (!m_done[index] && m_channels[index].TryDequeue(ref m_currentElement)) { m_channelIndex = (index + 1) % m_channels.Length; return true; } return MoveNextSlowPath(); } //----------------------------------------------------------------------------------- // The slow path used when a quick loop through the channels didn't come up // with anything. We may need to block and/or mark channels as done. // private bool MoveNextSlowPath() { int doneChannels = 0; // Remember the first channel we are looking at. If we pass through all of the // channels without finding an element, we will go to sleep. int firstChannelIndex = m_channelIndex; int currChannelIndex; while ((currChannelIndex = m_channelIndex) != m_channels.Length) { AsynchronousChannel current = m_channels[currChannelIndex]; bool isDone = m_done[currChannelIndex]; if (!isDone && current.TryDequeue(ref m_currentElement)) { // The channel has an item to be processed. We already remembered the current // element (Dequeue stores it as an out-parameter), so we just return true // after advancing to the next channel. m_channelIndex = (currChannelIndex + 1) % m_channels.Length; return true; } else { // There isn't an element in the current channel. Check whether the channel // is done before possibly waiting for an element to arrive. if (!isDone && current.IsDone) { // We must check to ensure an item didn't get enqueued after originally // trying to dequeue above and reading the IsDone flag. If there are still // elements, the producer may have marked the channel as done but of course // we still need to continue processing them. if (!current.IsChunkBufferEmpty) { bool dequeueResult = current.TryDequeue(ref m_currentElement); Contract.Assert(dequeueResult, "channel isn't empty, yet the dequeue failed, hmm"); return true; } // Mark this channel as being truly done. We won't consider it any longer. m_done[currChannelIndex] = true; if (m_channelEvents != null) { m_channelEvents[currChannelIndex] = null; //we definitely never want to wait on this (soon to be disposed) event. } isDone = true; current.Dispose(); } if (isDone) { Contract.Assert(m_channels[currChannelIndex].IsDone, "thought this channel was done"); Contract.Assert(m_channels[currChannelIndex].IsChunkBufferEmpty, "thought this channel was empty"); // Increment the count of done channels that we've seen. If this reaches the // total number of channels, we know we're finally done. if (++doneChannels == m_channels.Length) { // Remember that we are done by setting the index past the end. m_channelIndex = currChannelIndex = m_channels.Length; break; } } // Still no element. Advance to the next channel and continue searching. m_channelIndex = currChannelIndex = (currChannelIndex + 1) % m_channels.Length; // If the channels aren't done, and we've inspected all of the queues and still // haven't found anything, we will go ahead and wait on all the queues. if (currChannelIndex == firstChannelIndex) { // On our first pass through the queues, we didn't have any side-effects // that would let a producer know we are waiting. Now we go through and // accumulate a set of events to wait on. try { // If this is the first time we must block, lazily allocate and cache // a list of events to be reused for next time. if (m_channelEvents == null) m_channelEvents = new ManualResetEventSlim[m_channels.Length]; // Reset our done channels counter; we need to tally them again during the // second pass through. doneChannels = 0; for (int i = 0; i < m_channels.Length; i++) { if (!m_done[i] && m_channels[i].TryDequeue(ref m_currentElement, ref m_channelEvents[i])) { Contract.Assert(m_channelEvents[i] == null); // The channel has received an item since the last time we checked. // Just return and let the consumer process the element returned. return true; } else if (m_channelEvents[i] == null) { // The channel in question is done and empty. Contract.Assert(m_channels[i].IsDone, "expected channel to be done"); Contract.Assert(m_channels[i].IsChunkBufferEmpty, "expected channel to be empty"); if (!m_done[i]) { m_done[i] = true; m_channels[i].Dispose(); } if (++doneChannels == m_channels.Length) { // No need to wait. All channels are done. Remember this by setting // the index past the end of the channel list. m_channelIndex = currChannelIndex = m_channels.Length; break; } } } // If all channels are done, we can break out of the loop entirely. if (currChannelIndex == m_channels.Length) { break; } // Finally, we have accumulated a set of events. Perform a wait-any on them. Contract.Assert(m_channelEvents.Length <= 63, "WaitForMultipleObjects only supports 63 threads if running on an STA thread (64 otherwise)."); //This WaitAny() does not require cancellation support as it will wake up when all the producers into the //channel have finished. Hence, if all the producers wake up on cancellation, so will this. m_channelIndex = currChannelIndex = WaitAny(m_channelEvents); Contract.Assert(0 <= m_channelIndex && m_channelIndex < m_channelEvents.Length); Contract.Assert(0 <= currChannelIndex && currChannelIndex < m_channelEvents.Length); Contract.Assert(m_channelEvents[currChannelIndex] != null); // // We have woken up, and the channel that caused this is contained in the // returned index. This could be due to one of two reasons. Either the channel's // producer has notified that it is done, in which case we just have to take it // out of our current wait-list and redo the wait, or a channel actually has an // item which we will go ahead and process. // // We just go back 'round the loop to accomplish this logic. Reset the channel // index and # of done channels. Go back to the beginning, starting with the channel // that caused us to wake up. // firstChannelIndex = currChannelIndex; doneChannels = 0; } finally { // We have to guarantee that any waits we said we would perform are undone. for (int i = 0; i < m_channelEvents.Length; i++) { // If we retrieved an event from a channel, we need to reset the wait. if (m_channelEvents[i] != null) { m_channels[i].DoneWithDequeueWait(); } } } } } } TraceHelpers.TraceInfo("[timing]: {0}: Completed the merge", DateTime.Now.Ticks); // If we got this far, it means we've exhausted our channels. Contract.Assert(currChannelIndex == m_channels.Length); // If any tasks failed, propagate the failure now. We must do it here, because the merge // executor returns control back to the caller before the query has completed; contrast // this with synchronous enumeration where we can wait before returning. m_taskGroupState.QueryEnd(false); return false; } /// /// WaitAny simulates a Win32-style WaitAny on the set of thin-events. /// /// An array of thin-events (null elements permitted) ///The index of the specific event in events that caused us to wake up. private static int WaitAny(ManualResetEventSlim[] events) { Contract.Assert(events != null); // Small spin wait loop. const int WAIT_ANY_SPINS = 20; SpinWait spinner = new SpinWait(); for (int i = 0; i < WAIT_ANY_SPINS; i++) { for (int j = 0; j < events.Length; j++) { if (events[j] != null && events[j].IsSet) return j; } spinner.SpinOnce(); } // Next, count up the null events. int nullEvents = 0; for (int i = 0; i < events.Length; i++) { if (events[i] == null) nullEvents++; } // Lastly, accumulate the events in preparation for a true wait. WaitHandle[] waitHandles = new WaitHandle[events.Length - nullEvents]; Contract.Assert(waitHandles.Length > 0); for (int i = 0, j = 0; i < events.Length; i++) { if (events[i] == null) { continue; } waitHandles[j] = events[i].WaitHandle; j++; } // And finally, issue the real wait. int index = WaitHandle.WaitAny(waitHandles); // Translate this back into the events array index. The 'waitHandles' array // will effectively have the non-null elements "slid down" into the positions // from 'events' that contain nulls. We count the number of null handles before // the index and add that to get our real position. for (int i = 0, j = -1; i < events.Length; i++) { // If the current event is non-null, increment our translation index. if (events[i] != null) { j++; // If we found the element, adjust our index and break. if (j == index) { index = i; break; } } Contract.Assert(i != events.Length - 1, "didn't find a non-null event"); } Contract.Assert(events[index] != null, "expected non-null event"); return index; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- OperationCanceledException.cs
- ShaderRenderModeValidation.cs
- NullReferenceException.cs
- MultiBinding.cs
- MatcherBuilder.cs
- HttpHandlerActionCollection.cs
- WebPartPersonalization.cs
- XmlSchemas.cs
- HandlerBase.cs
- FontFamily.cs
- XmlObjectSerializer.cs
- OperandQuery.cs
- PasswordBoxAutomationPeer.cs
- GradientSpreadMethodValidation.cs
- SqlTopReducer.cs
- ColumnMap.cs
- Oid.cs
- XamlTypeMapper.cs
- TimelineCollection.cs
- SchemaObjectWriter.cs
- ThreadSafeList.cs
- EventPrivateKey.cs
- Convert.cs
- ScriptComponentDescriptor.cs
- NonVisualControlAttribute.cs
- DoubleCollection.cs
- ObjectStorage.cs
- DocumentXmlWriter.cs
- SocketElement.cs
- StatusBarItem.cs
- AnnotationDocumentPaginator.cs
- ListViewInsertionMark.cs
- XmlProcessingInstruction.cs
- DesignerLoader.cs
- ToolStripDesignerAvailabilityAttribute.cs
- AssemblyName.cs
- Int64Storage.cs
- AutomationPropertyInfo.cs
- XmlDigitalSignatureProcessor.cs
- SmtpNtlmAuthenticationModule.cs
- ProviderConnectionPoint.cs
- BookmarkScopeManager.cs
- TextParagraphCache.cs
- X509ClientCertificateCredentialsElement.cs
- SettingsSection.cs
- TreeNodeClickEventArgs.cs
- SeekableReadStream.cs
- Quaternion.cs
- CacheDependency.cs
- AttributeEmitter.cs
- ResourceDescriptionAttribute.cs
- EastAsianLunisolarCalendar.cs
- XmlHierarchicalEnumerable.cs
- EntityStoreSchemaFilterEntry.cs
- InternalCompensate.cs
- SqlUserDefinedTypeAttribute.cs
- SmtpTransport.cs
- ButtonColumn.cs
- TypedAsyncResult.cs
- OuterGlowBitmapEffect.cs
- DataGridToolTip.cs
- ButtonChrome.cs
- NameValuePermission.cs
- WebPartCatalogAddVerb.cs
- ControlBuilderAttribute.cs
- NumberAction.cs
- ColumnMap.cs
- TableLayoutPanelBehavior.cs
- C14NUtil.cs
- SessionEndingEventArgs.cs
- CopyNodeSetAction.cs
- CommonRemoteMemoryBlock.cs
- ClusterUtils.cs
- Sql8ConformanceChecker.cs
- ThreadAttributes.cs
- DefaultProxySection.cs
- BrowserCapabilitiesCodeGenerator.cs
- MaxValueConverter.cs
- SamlDoNotCacheCondition.cs
- UnicastIPAddressInformationCollection.cs
- AppSettings.cs
- PointAnimationBase.cs
- OleDbMetaDataFactory.cs
- WindowsEditBox.cs
- MessageSmuggler.cs
- DropShadowBitmapEffect.cs
- BreakRecordTable.cs
- OpenTypeLayout.cs
- SimpleApplicationHost.cs
- SecurityTraceRecordHelper.cs
- ProcessModuleCollection.cs
- SecurityContextTokenValidationException.cs
- ObjectConverter.cs
- DropDownButton.cs
- DataGridItemEventArgs.cs
- ProjectedWrapper.cs
- CodePageEncoding.cs
- SharedDp.cs
- GridViewUpdatedEventArgs.cs
- List.cs