Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / OrderPreservingPipeliningMergeHelper.cs / 1305376 / OrderPreservingPipeliningMergeHelper.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingPipeliningMergeHelper.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; using System.Threading.Tasks; namespace System.Linq.Parallel { ////// A merge helper that yields results in a streaming fashion, while still ensuring correct output /// ordering. This merge only works if each producer task generates outputs in the correct order, /// i.e. with an Increasing (or Correct) order index. /// /// The merge creates DOP producer tasks, each of which will be writing results into a separate /// buffer. /// /// The consumer always waits until each producer buffer contains at least one element. If we don't /// have one element from each producer, we cannot yield the next element. (If the order index is /// Correct, or in some special cases with the Increasing order, we could yield sooner. The /// current algorithm does not take advantage of this.) /// /// The consumer maintains a producer heap, and uses it to decide which producer should yield the next output /// result. After yielding an element from a particular producer, the consumer will take another element /// from the same producer. However, if the producer buffer exceeded a particular threshold, the consumer /// will take the entire buffer, and give the producer an empty buffer to fill. /// /// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will /// go to sleep and wait until the consumer takes the entire buffer. /// internal class OrderPreservingPipeliningMergeHelper: IMergeHelper { private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks. private readonly PartitionedStream m_partitions; // Source partitions. private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. /// /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer. /// If false, the producer will make each result available to the consumer immediately after it is /// produced. /// private readonly bool m_autoBuffered; ////// Buffers for the results. Each buffer has elements added by one producer, and removed /// by the consumer. /// private readonly Queue>[] m_buffers; /// /// Whether each producer is done producing. Set to true by individual producers, read by consumer. /// private readonly bool[] m_producerDone; ////// Whether a particular producer is waiting on the consumer. Read by the consumer, set to true /// by producers, set to false by the consumer. /// private readonly bool[] m_producerWaiting; ////// Whether the consumer is waiting on a particular producer. Read by producers, set to true /// by consumer, set to false by producer. /// private readonly bool[] m_consumerWaiting; ////// Each object is a lock protecting the corresponding elements in m_buffers, m_producerDone, /// m_producerWaiting and m_consumerWaiting. /// private readonly object[] m_bufferLocks; ////// A singleton instance of the comparer used by the producer heap. Eager allocation is OK /// because if the static constructor runs, we will be using this merge. /// private static ProducerComparer s_producerComparer = new ProducerComparer(); ////// The initial capacity of the buffer queue. The value was chosen experimentally. /// internal const int INITIAL_BUFFER_SIZE = 128; ////// If the consumer notices that the queue reached this limit, it will take the entire buffer from /// the producer, instead of just popping off one result. The value was chosen experimentally. /// internal const int STEAL_BUFFER_SIZE = 1024; ////// If the producer notices that the queue reached this limit, it will go to sleep until woken up /// by the consumer. Chosen experimentally. /// internal const int MAX_BUFFER_SIZE = 8192; //------------------------------------------------------------------------------------ // Instantiates a new merge helper. // // Arguments: // partitions - the source partitions from which to consume data. // ignoreOutput - whether we're enumerating "for effect" or for output. // internal OrderPreservingPipeliningMergeHelper( PartitionedStreampartitions, TaskScheduler taskScheduler, CancellationState cancellationState, bool autoBuffered, int queryId) { Contract.Assert(partitions != null); TraceHelpers.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper"); m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId); m_partitions = partitions; m_taskScheduler = taskScheduler; m_autoBuffered = autoBuffered; int partitionCount = m_partitions.PartitionCount; m_buffers = new Queue >[partitionCount]; m_producerDone = new bool[partitionCount]; m_consumerWaiting = new bool[partitionCount]; m_producerWaiting = new bool[partitionCount]; m_bufferLocks = new object[partitionCount]; } //----------------------------------------------------------------------------------- // Schedules execution of the merge itself. // void IMergeHelper .Execute() { OrderPreservingPipeliningSpoolingTask .Spool( m_taskGroupState, m_partitions, m_consumerWaiting, m_producerWaiting, m_producerDone, m_buffers, m_bufferLocks, m_taskScheduler, m_autoBuffered); } //----------------------------------------------------------------------------------- // Gets the enumerator from which to enumerate output results. // IEnumerator IMergeHelper .GetEnumerator() { return new OrderedPipeliningMergeEnumerator(this); } //----------------------------------------------------------------------------------- // Returns the results as an array. // public TOutput[] GetResultsAsArray() { Contract.Assert(false, "An ordered pipelining merge is not intended to be used this way."); throw new InvalidOperationException(); } /// /// A structure to represent a producer in the producer heap. /// private struct Producer { internal readonly int MaxKey; // Order index of the next element from this producer internal readonly int ProducerIndex; // Index of the producer, [0..DOP) internal Producer(int maxKey, int producerIndex) { MaxKey = maxKey; ProducerIndex = producerIndex; } } ////// A comparer used by FixedMaxHeap(Of Producer) /// /// This comparer will be used by max-heap. We want the producer with the smallest MaxKey to /// end up in the root of the heap. /// /// x.MaxKey GREATER_THAN y.MaxKey => x LESS_THAN y => return - /// x.MaxKey EQUALS y.MaxKey => x EQUALS y => return 0 /// x.MaxKey LESS_THAN y.MaxKey => x GREATER_THAN y => return + /// private class ProducerComparer : IComparer{ public int Compare(Producer x, Producer y) { Contract.Assert(x.MaxKey >= 0 && y.MaxKey >= 0); // Guarantees no overflow on next line return y.MaxKey - x.MaxKey; } } /// /// Enumerator over the results of an order-preserving pipelining merge. /// private class OrderedPipeliningMergeEnumerator : MergeEnumerator{ /// /// Merge helper associated with this enumerator /// private OrderPreservingPipeliningMergeHelperm_mergeHelper; /// /// Heap used to efficiently locate the producer whose result should be consumed next. /// For each producer, stores the order index for the next element to be yielded. /// /// Read and written by the consumer only. /// private readonly FixedMaxHeapm_producerHeap; /// /// Stores the next element to be yielded from each producer. We use a separate array /// rather than storing this information in the producer heap to keep the Producer struct /// small. /// /// Read and written by the consumer only. /// private readonly TOutput[] m_producerNextElement; ////// A private buffer for the consumer. When the size of a producer buffer exceeds a threshold /// (STEAL_BUFFER_SIZE), the consumer will take ownership of the entire buffer, and give the /// producer a new empty buffer to place results into. /// /// Read and written by the consumer only. /// private readonly Queue>[] m_privateBuffer; /// /// Tracks whether MoveNext() has already been called previously. /// private bool m_initialized = false; ////// Constructor /// internal OrderedPipeliningMergeEnumerator(OrderPreservingPipeliningMergeHelpermergeHelper) :base(mergeHelper.m_taskGroupState) { int partitionCount = mergeHelper.m_partitions.PartitionCount; m_mergeHelper = mergeHelper; m_producerHeap = new FixedMaxHeap (partitionCount, s_producerComparer); m_privateBuffer = new Queue >[partitionCount]; m_producerNextElement = new TOutput[partitionCount]; } /// /// Returns the current result /// public override TOutput Current { get { int producerToYield = m_producerHeap.MaxValue.ProducerIndex; return m_producerNextElement[producerToYield]; } } ////// Moves the enumerator to the next result, or returns false if there are no more results to yield. /// public override bool MoveNext() { if (!m_initialized) { // // Initialization: wait until each producer has produced at least one element. Since the order indices // are increasing, we cannot start yielding until we have at least one element from each producer. // m_initialized = true; for (int producer = 0; producer < m_mergeHelper.m_partitions.PartitionCount; producer++) { Pairelement = default(Pair ); // Get the first element from this producer if (TryWaitForElement(producer, ref element)) { // Update the producer heap and its helper array with the received element m_producerHeap.Insert(new Producer(element.First, producer)); m_producerNextElement[producer] = element.Second; } else { // If this producer didn't produce any results because it encountered an exception, // cancellation would have been initiated by now. If cancellation has started, we will // propagate the exception now. ThrowIfInTearDown(); } } } else { // If the producer heap is empty, we are done. In fact, we know that a previous MoveNext() call // already returned false. if (m_producerHeap.Count == 0) { return false; } // // Get the next element from the producer that yielded a value last. Update the producer heap. // The next producer to yield will be in the front of the producer heap. // // The last producer whose result the merge yielded int lastProducer = m_producerHeap.MaxValue.ProducerIndex; // Get the next element from the same producer Pair element = default(Pair ); if (TryGetPrivateElement(lastProducer, ref element) || TryWaitForElement(lastProducer, ref element)) { // Update the producer heap and its helper array with the received element m_producerHeap.ReplaceMax(new Producer(element.First, lastProducer)); m_producerNextElement[lastProducer] = element.Second; } else { // If this producer is done because it encountered an exception, cancellation // would have been initiated by now. If cancellation has started, we will propagate // the exception now. ThrowIfInTearDown(); // This producer is done. Remove it from the producer heap. m_producerHeap.RemoveMax(); } } return m_producerHeap.Count > 0; } /// /// If the cancellation of the query has been initiated (because one or more producers /// encountered exceptions, or because external cancellation token has been set), the method /// will tear down the query and rethrow the exception. /// private void ThrowIfInTearDown() { if (m_mergeHelper.m_taskGroupState.CancellationState.MergedCancellationToken.IsCancellationRequested) { try { // Wake up all producers. Since the cancellation token has already been // set, the producers will eventually stop after waking up. object[] locks = m_mergeHelper.m_bufferLocks; for (int i = 0; i < locks.Length; i++) { lock (locks[i]) { Monitor.Pulse(locks[i]); } } // Now, we wait for all producers to wake up, notice the cancellation and stop executing. // QueryEnd will wait on all tasks to complete and then propagate all exceptions. m_taskGroupState.QueryEnd(false); Contract.Assert(false, "QueryEnd() should have thrown an exception."); } finally { // Clear the producer heap so that future calls to MoveNext simply return false. m_producerHeap.Clear(); } } } ////// Wait until a producer's buffer is non-empty, or until that producer is done. /// ///false if there is no element to yield because the producer is done, true otherwise private bool TryWaitForElement(int producer, ref Pairelement) { Queue > buffer = m_mergeHelper.m_buffers[producer]; object bufferLock = m_mergeHelper.m_bufferLocks[producer]; lock (bufferLock) { // If the buffer is empty, we need to wait on the producer if (buffer.Count == 0) { // If the producer is already done, return false if (m_mergeHelper.m_producerDone[producer]) { element = default(Pair ); return false; } m_mergeHelper.m_consumerWaiting[producer] = true; Monitor.Wait(bufferLock); // If the buffer is still empty, the producer is done if (buffer.Count == 0) { Contract.Assert(m_mergeHelper.m_producerDone[producer]); element = default(Pair ); return false; } } Contract.Assert(buffer.Count > 0, "Producer's buffer should not be empty here."); // If the producer is waiting, wake it up if (m_mergeHelper.m_producerWaiting[producer]) { Monitor.Pulse(bufferLock); m_mergeHelper.m_producerWaiting[producer] = false; } if (buffer.Count < STEAL_BUFFER_SIZE) { element = buffer.Dequeue(); return true; } else { // Privatize the entire buffer m_privateBuffer[producer] = m_mergeHelper.m_buffers[producer]; // Give an empty buffer to the producer m_mergeHelper.m_buffers[producer] = new Queue >(INITIAL_BUFFER_SIZE); // No return statement. // This is the only branch that contines below of the lock region. } } // Get an element out of the private buffer. bool gotElement = TryGetPrivateElement(producer, ref element); Contract.Assert(gotElement); return true; } /// /// Looks for an element from a particular producer in the consumer's private buffer. /// private bool TryGetPrivateElement(int producer, ref Pairelement) { var privateChunk = m_privateBuffer[producer]; if (privateChunk != null) { if (privateChunk.Count > 0) { element = privateChunk.Dequeue(); return true; } Contract.Assert(m_privateBuffer[producer].Count == 0); m_privateBuffer[producer] = null; } return false; } public override void Dispose() { // Wake up any waiting producers int partitionCount = m_mergeHelper.m_buffers.Length; for (int producer = 0; producer < partitionCount; producer++) { object bufferLock = m_mergeHelper.m_bufferLocks[producer]; lock (bufferLock) { if (m_mergeHelper.m_producerWaiting[producer]) { Monitor.Pulse(bufferLock); } } } base.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingPipeliningMergeHelper.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; using System.Threading.Tasks; namespace System.Linq.Parallel { ////// A merge helper that yields results in a streaming fashion, while still ensuring correct output /// ordering. This merge only works if each producer task generates outputs in the correct order, /// i.e. with an Increasing (or Correct) order index. /// /// The merge creates DOP producer tasks, each of which will be writing results into a separate /// buffer. /// /// The consumer always waits until each producer buffer contains at least one element. If we don't /// have one element from each producer, we cannot yield the next element. (If the order index is /// Correct, or in some special cases with the Increasing order, we could yield sooner. The /// current algorithm does not take advantage of this.) /// /// The consumer maintains a producer heap, and uses it to decide which producer should yield the next output /// result. After yielding an element from a particular producer, the consumer will take another element /// from the same producer. However, if the producer buffer exceeded a particular threshold, the consumer /// will take the entire buffer, and give the producer an empty buffer to fill. /// /// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will /// go to sleep and wait until the consumer takes the entire buffer. /// internal class OrderPreservingPipeliningMergeHelper: IMergeHelper { private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks. private readonly PartitionedStream m_partitions; // Source partitions. private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. /// /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer. /// If false, the producer will make each result available to the consumer immediately after it is /// produced. /// private readonly bool m_autoBuffered; ////// Buffers for the results. Each buffer has elements added by one producer, and removed /// by the consumer. /// private readonly Queue>[] m_buffers; /// /// Whether each producer is done producing. Set to true by individual producers, read by consumer. /// private readonly bool[] m_producerDone; ////// Whether a particular producer is waiting on the consumer. Read by the consumer, set to true /// by producers, set to false by the consumer. /// private readonly bool[] m_producerWaiting; ////// Whether the consumer is waiting on a particular producer. Read by producers, set to true /// by consumer, set to false by producer. /// private readonly bool[] m_consumerWaiting; ////// Each object is a lock protecting the corresponding elements in m_buffers, m_producerDone, /// m_producerWaiting and m_consumerWaiting. /// private readonly object[] m_bufferLocks; ////// A singleton instance of the comparer used by the producer heap. Eager allocation is OK /// because if the static constructor runs, we will be using this merge. /// private static ProducerComparer s_producerComparer = new ProducerComparer(); ////// The initial capacity of the buffer queue. The value was chosen experimentally. /// internal const int INITIAL_BUFFER_SIZE = 128; ////// If the consumer notices that the queue reached this limit, it will take the entire buffer from /// the producer, instead of just popping off one result. The value was chosen experimentally. /// internal const int STEAL_BUFFER_SIZE = 1024; ////// If the producer notices that the queue reached this limit, it will go to sleep until woken up /// by the consumer. Chosen experimentally. /// internal const int MAX_BUFFER_SIZE = 8192; //------------------------------------------------------------------------------------ // Instantiates a new merge helper. // // Arguments: // partitions - the source partitions from which to consume data. // ignoreOutput - whether we're enumerating "for effect" or for output. // internal OrderPreservingPipeliningMergeHelper( PartitionedStreampartitions, TaskScheduler taskScheduler, CancellationState cancellationState, bool autoBuffered, int queryId) { Contract.Assert(partitions != null); TraceHelpers.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper"); m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId); m_partitions = partitions; m_taskScheduler = taskScheduler; m_autoBuffered = autoBuffered; int partitionCount = m_partitions.PartitionCount; m_buffers = new Queue >[partitionCount]; m_producerDone = new bool[partitionCount]; m_consumerWaiting = new bool[partitionCount]; m_producerWaiting = new bool[partitionCount]; m_bufferLocks = new object[partitionCount]; } //----------------------------------------------------------------------------------- // Schedules execution of the merge itself. // void IMergeHelper .Execute() { OrderPreservingPipeliningSpoolingTask .Spool( m_taskGroupState, m_partitions, m_consumerWaiting, m_producerWaiting, m_producerDone, m_buffers, m_bufferLocks, m_taskScheduler, m_autoBuffered); } //----------------------------------------------------------------------------------- // Gets the enumerator from which to enumerate output results. // IEnumerator IMergeHelper .GetEnumerator() { return new OrderedPipeliningMergeEnumerator(this); } //----------------------------------------------------------------------------------- // Returns the results as an array. // public TOutput[] GetResultsAsArray() { Contract.Assert(false, "An ordered pipelining merge is not intended to be used this way."); throw new InvalidOperationException(); } /// /// A structure to represent a producer in the producer heap. /// private struct Producer { internal readonly int MaxKey; // Order index of the next element from this producer internal readonly int ProducerIndex; // Index of the producer, [0..DOP) internal Producer(int maxKey, int producerIndex) { MaxKey = maxKey; ProducerIndex = producerIndex; } } ////// A comparer used by FixedMaxHeap(Of Producer) /// /// This comparer will be used by max-heap. We want the producer with the smallest MaxKey to /// end up in the root of the heap. /// /// x.MaxKey GREATER_THAN y.MaxKey => x LESS_THAN y => return - /// x.MaxKey EQUALS y.MaxKey => x EQUALS y => return 0 /// x.MaxKey LESS_THAN y.MaxKey => x GREATER_THAN y => return + /// private class ProducerComparer : IComparer{ public int Compare(Producer x, Producer y) { Contract.Assert(x.MaxKey >= 0 && y.MaxKey >= 0); // Guarantees no overflow on next line return y.MaxKey - x.MaxKey; } } /// /// Enumerator over the results of an order-preserving pipelining merge. /// private class OrderedPipeliningMergeEnumerator : MergeEnumerator{ /// /// Merge helper associated with this enumerator /// private OrderPreservingPipeliningMergeHelperm_mergeHelper; /// /// Heap used to efficiently locate the producer whose result should be consumed next. /// For each producer, stores the order index for the next element to be yielded. /// /// Read and written by the consumer only. /// private readonly FixedMaxHeapm_producerHeap; /// /// Stores the next element to be yielded from each producer. We use a separate array /// rather than storing this information in the producer heap to keep the Producer struct /// small. /// /// Read and written by the consumer only. /// private readonly TOutput[] m_producerNextElement; ////// A private buffer for the consumer. When the size of a producer buffer exceeds a threshold /// (STEAL_BUFFER_SIZE), the consumer will take ownership of the entire buffer, and give the /// producer a new empty buffer to place results into. /// /// Read and written by the consumer only. /// private readonly Queue>[] m_privateBuffer; /// /// Tracks whether MoveNext() has already been called previously. /// private bool m_initialized = false; ////// Constructor /// internal OrderedPipeliningMergeEnumerator(OrderPreservingPipeliningMergeHelpermergeHelper) :base(mergeHelper.m_taskGroupState) { int partitionCount = mergeHelper.m_partitions.PartitionCount; m_mergeHelper = mergeHelper; m_producerHeap = new FixedMaxHeap (partitionCount, s_producerComparer); m_privateBuffer = new Queue >[partitionCount]; m_producerNextElement = new TOutput[partitionCount]; } /// /// Returns the current result /// public override TOutput Current { get { int producerToYield = m_producerHeap.MaxValue.ProducerIndex; return m_producerNextElement[producerToYield]; } } ////// Moves the enumerator to the next result, or returns false if there are no more results to yield. /// public override bool MoveNext() { if (!m_initialized) { // // Initialization: wait until each producer has produced at least one element. Since the order indices // are increasing, we cannot start yielding until we have at least one element from each producer. // m_initialized = true; for (int producer = 0; producer < m_mergeHelper.m_partitions.PartitionCount; producer++) { Pairelement = default(Pair ); // Get the first element from this producer if (TryWaitForElement(producer, ref element)) { // Update the producer heap and its helper array with the received element m_producerHeap.Insert(new Producer(element.First, producer)); m_producerNextElement[producer] = element.Second; } else { // If this producer didn't produce any results because it encountered an exception, // cancellation would have been initiated by now. If cancellation has started, we will // propagate the exception now. ThrowIfInTearDown(); } } } else { // If the producer heap is empty, we are done. In fact, we know that a previous MoveNext() call // already returned false. if (m_producerHeap.Count == 0) { return false; } // // Get the next element from the producer that yielded a value last. Update the producer heap. // The next producer to yield will be in the front of the producer heap. // // The last producer whose result the merge yielded int lastProducer = m_producerHeap.MaxValue.ProducerIndex; // Get the next element from the same producer Pair element = default(Pair ); if (TryGetPrivateElement(lastProducer, ref element) || TryWaitForElement(lastProducer, ref element)) { // Update the producer heap and its helper array with the received element m_producerHeap.ReplaceMax(new Producer(element.First, lastProducer)); m_producerNextElement[lastProducer] = element.Second; } else { // If this producer is done because it encountered an exception, cancellation // would have been initiated by now. If cancellation has started, we will propagate // the exception now. ThrowIfInTearDown(); // This producer is done. Remove it from the producer heap. m_producerHeap.RemoveMax(); } } return m_producerHeap.Count > 0; } /// /// If the cancellation of the query has been initiated (because one or more producers /// encountered exceptions, or because external cancellation token has been set), the method /// will tear down the query and rethrow the exception. /// private void ThrowIfInTearDown() { if (m_mergeHelper.m_taskGroupState.CancellationState.MergedCancellationToken.IsCancellationRequested) { try { // Wake up all producers. Since the cancellation token has already been // set, the producers will eventually stop after waking up. object[] locks = m_mergeHelper.m_bufferLocks; for (int i = 0; i < locks.Length; i++) { lock (locks[i]) { Monitor.Pulse(locks[i]); } } // Now, we wait for all producers to wake up, notice the cancellation and stop executing. // QueryEnd will wait on all tasks to complete and then propagate all exceptions. m_taskGroupState.QueryEnd(false); Contract.Assert(false, "QueryEnd() should have thrown an exception."); } finally { // Clear the producer heap so that future calls to MoveNext simply return false. m_producerHeap.Clear(); } } } ////// Wait until a producer's buffer is non-empty, or until that producer is done. /// ///false if there is no element to yield because the producer is done, true otherwise private bool TryWaitForElement(int producer, ref Pairelement) { Queue > buffer = m_mergeHelper.m_buffers[producer]; object bufferLock = m_mergeHelper.m_bufferLocks[producer]; lock (bufferLock) { // If the buffer is empty, we need to wait on the producer if (buffer.Count == 0) { // If the producer is already done, return false if (m_mergeHelper.m_producerDone[producer]) { element = default(Pair ); return false; } m_mergeHelper.m_consumerWaiting[producer] = true; Monitor.Wait(bufferLock); // If the buffer is still empty, the producer is done if (buffer.Count == 0) { Contract.Assert(m_mergeHelper.m_producerDone[producer]); element = default(Pair ); return false; } } Contract.Assert(buffer.Count > 0, "Producer's buffer should not be empty here."); // If the producer is waiting, wake it up if (m_mergeHelper.m_producerWaiting[producer]) { Monitor.Pulse(bufferLock); m_mergeHelper.m_producerWaiting[producer] = false; } if (buffer.Count < STEAL_BUFFER_SIZE) { element = buffer.Dequeue(); return true; } else { // Privatize the entire buffer m_privateBuffer[producer] = m_mergeHelper.m_buffers[producer]; // Give an empty buffer to the producer m_mergeHelper.m_buffers[producer] = new Queue >(INITIAL_BUFFER_SIZE); // No return statement. // This is the only branch that contines below of the lock region. } } // Get an element out of the private buffer. bool gotElement = TryGetPrivateElement(producer, ref element); Contract.Assert(gotElement); return true; } /// /// Looks for an element from a particular producer in the consumer's private buffer. /// private bool TryGetPrivateElement(int producer, ref Pairelement) { var privateChunk = m_privateBuffer[producer]; if (privateChunk != null) { if (privateChunk.Count > 0) { element = privateChunk.Dequeue(); return true; } Contract.Assert(m_privateBuffer[producer].Count == 0); m_privateBuffer[producer] = null; } return false; } public override void Dispose() { // Wake up any waiting producers int partitionCount = m_mergeHelper.m_buffers.Length; for (int producer = 0; producer < partitionCount; producer++) { object bufferLock = m_mergeHelper.m_bufferLocks[producer]; lock (bufferLock) { if (m_mergeHelper.m_producerWaiting[producer]) { Monitor.Pulse(bufferLock); } } } base.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- CompiledQueryCacheKey.cs
- TableRow.cs
- OneOf.cs
- GradientBrush.cs
- WithStatement.cs
- ContextBase.cs
- UIElement.cs
- TracedNativeMethods.cs
- UIElement3DAutomationPeer.cs
- FormViewModeEventArgs.cs
- TypedReference.cs
- _ConnectStream.cs
- IdleTimeoutMonitor.cs
- ContentPropertyAttribute.cs
- DataSourceConverter.cs
- EditorAttribute.cs
- EntityContainer.cs
- BaseCodeDomTreeGenerator.cs
- OracleRowUpdatingEventArgs.cs
- Vector3DCollectionConverter.cs
- WindowsFormsLinkLabel.cs
- StreamMarshaler.cs
- COM2EnumConverter.cs
- Sorting.cs
- WorkflowRuntimeServiceElementCollection.cs
- HttpModulesSection.cs
- MimeTypeAttribute.cs
- GPRECT.cs
- CodeTypeReferenceExpression.cs
- VerticalAlignConverter.cs
- XamlTreeBuilder.cs
- StorageFunctionMapping.cs
- TypeSource.cs
- GeneralTransform3D.cs
- HuffModule.cs
- XamlVector3DCollectionSerializer.cs
- Repeater.cs
- ScriptMethodAttribute.cs
- Cursor.cs
- ErrorHandler.cs
- DataObject.cs
- IdentityNotMappedException.cs
- Translator.cs
- AppliedDeviceFiltersDialog.cs
- WebPartConnectionsDisconnectVerb.cs
- GroupPartitionExpr.cs
- ReferentialConstraint.cs
- PropertyPath.cs
- WindowsStartMenu.cs
- Thumb.cs
- EmptyQuery.cs
- RepeaterItemCollection.cs
- FunctionOverloadResolver.cs
- ApplySecurityAndSendAsyncResult.cs
- WebPartsPersonalization.cs
- AddInStore.cs
- BasicExpressionVisitor.cs
- RangeBaseAutomationPeer.cs
- TextSearch.cs
- DBDataPermission.cs
- ValidationHelpers.cs
- CharacterShapingProperties.cs
- DuplexChannel.cs
- TreeView.cs
- ImageFormatConverter.cs
- CommonDialog.cs
- CompilationUtil.cs
- WmpBitmapDecoder.cs
- PageCatalogPart.cs
- DeploymentSection.cs
- HttpWriter.cs
- Vector3dCollection.cs
- TriggerBase.cs
- BypassElementCollection.cs
- TransformGroup.cs
- StreamInfo.cs
- TemplateControlCodeDomTreeGenerator.cs
- RequestContext.cs
- Wildcard.cs
- HttpApplicationStateBase.cs
- BitStream.cs
- AstTree.cs
- TypeUnloadedException.cs
- WorkflowClientDeliverMessageWrapper.cs
- AndMessageFilter.cs
- TimeoutValidationAttribute.cs
- InfoCardArgumentException.cs
- LambdaCompiler.cs
- ElementUtil.cs
- DataObjectMethodAttribute.cs
- XamlFigureLengthSerializer.cs
- DesignerProperties.cs
- NumberFormatInfo.cs
- ClaimComparer.cs
- Wizard.cs
- DataGridViewTextBoxCell.cs
- ApplicationServiceHelper.cs
- DataServiceKeyAttribute.cs
- TypeHelper.cs
- TemplateControlParser.cs