Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Threading / Tasks / ParallelRangeManager.cs / 1305376 / ParallelRangeManager.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ParallelRangeManager.cs // //[....] // // Implements the algorithm for distributing loop indices to parallel loop workers // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Threading; using System.Diagnostics.Contracts; #pragma warning disable 0420 namespace System.Threading.Tasks { ////// Utility class for allocating structs as heap variables /// internal class Shared{ internal T Value; internal Shared(T value) { this.Value = value; } } /// /// Represents an index range /// internal struct IndexRange { // the From and To values for this range. These do not change. internal long m_nFromInclusive; internal long m_nToExclusive; // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual // value saves us from overflows that can happen due to multiple workers racing to increment this. // All updates to this field need to be interlocked. internal Sharedm_nSharedCurrentIndexOffset; // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. internal int m_bRangeFinished; } /// /// The RangeWorker struct wraps the state needed by a task that services the parallel loop /// internal struct RangeWorker { // reference to the IndexRange array allocated by the range manager internal readonly IndexRange[] m_indexRanges; // index of the current index range that this worker is grabbing chunks from internal int m_nCurrentIndexRange; // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) internal long m_nStep; // increment value is the current amount that this worker will use // to increment the shared index of the range it's working on internal long m_nIncrementValue; // the increment value is doubled each time this worker finds work, and is capped at this value internal readonly long m_nMaxIncrementValue; ////// Initializes a RangeWorker struct /// internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) { m_indexRanges = ranges; m_nCurrentIndexRange = nInitialRange; m_nStep = nStep; m_nIncrementValue = nStep; m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; } ////// Implements the core work search algorithm that will be used for this range worker. /// /// /// Usage pattern is: /// 1) the thread associated with this rangeworker calls FindNewWork /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values /// to execute the sequential loop /// 3) if we return false it means there is no more work left. It's time to quit. /// internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) { // since we iterate over index ranges circularly, we will use the // count of visited ranges as our exit condition int numIndexRangesToVisit = m_indexRanges.Length; do { // local snap to save array access bounds checks in places where we only read fields IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; if (currentRange.m_bRangeFinished == 0) { if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) { Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null); } // this access needs to be on the array slot long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, m_nIncrementValue) - m_nIncrementValue; if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) { // we found work nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; // Check for going past end of range, or wrapping if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) { nToExclusiveLocal = currentRange.m_nToExclusive; } // We will double our unit of increment until it reaches the maximum. if (m_nIncrementValue < m_nMaxIncrementValue) { m_nIncrementValue *= 2; if (m_nIncrementValue > m_nMaxIncrementValue) { m_nIncrementValue = m_nMaxIncrementValue; } } return true; } else { // this index range is completed, mark it so that others can skip it quickly Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); } } // move on to the next index range, in circular order. m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; numIndexRangesToVisit--; } while (numIndexRangesToVisit > 0); // we've visited all index ranges possible => there's no work remaining nFromInclusiveLocal = 0; nToExclusiveLocal = 0; return false; } /// /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. /// internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) { long nFromInclusiveLocal; long nToExclusiveLocal; bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); // convert to 32 bit before returning nFromInclusiveLocal32 = (int)nFromInclusiveLocal; nToExclusiveLocal32 = (int)nToExclusiveLocal; return bRetVal; } } ////// Represents the entire loop operation, keeping track of workers and ranges. /// /// /// The usage pattern is: /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a /// RangeWorker struct to wrap the state it will need to find and execute work, /// and they keep interacting with that struct until the end of the loop internal class RangeManager { internal readonly IndexRange[] m_indexRanges; internal int m_nCurrentIndexRangeToAssign; internal long m_nStep; ////// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges /// internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) { m_nCurrentIndexRangeToAssign = 0; m_nStep = nStep; // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. if (nNumExpectedWorkers == 1) nNumExpectedWorkers = 2; // // calculate the size of each index range // ulong uSpan = (ulong)(nToExclusive - nFromInclusive); ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep // otherwise index range transitions will derail us from nStep if (uRangeSize == 0) { uRangeSize = (ulong) nStep; } // // find the actual number of index ranges we will need // Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue); int nNumRanges = (int)(uSpan / uRangeSize); if (uSpan % uRangeSize != 0) { nNumRanges++; } // Convert to signed so the rest of the logic works. // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. long nRangeSize = (long)uRangeSize; // allocate the array of index ranges m_indexRanges = new IndexRange[nNumRanges]; long nCurrentIndex = nFromInclusive; for (int i = 0; i < nNumRanges; i++) { // the fromInclusive of the new index range is always on nCurrentIndex m_indexRanges[i].m_nFromInclusive = nCurrentIndex; m_indexRanges[i].m_nSharedCurrentIndexOffset = null; m_indexRanges[i].m_bRangeFinished = 0; // now increment it to find the toExclusive value for our range nCurrentIndex += nRangeSize; // detect integer overflow or range overage and snap to nToExclusive if (nCurrentIndex < nCurrentIndex - nRangeSize || nCurrentIndex > nToExclusive) { // this should only happen at the last index Contract.Assert(i == nNumRanges - 1); nCurrentIndex = nToExclusive; } // now that the end point of the new range is calculated, assign it. m_indexRanges[i].m_nToExclusive = nCurrentIndex; } } ////// The function that needs to be called by each new worker thread servicing the parallel loop /// in order to get a RangeWorker struct that wraps the state for finding and executing indices /// internal RangeWorker RegisterNewWorker() { Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0); int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ParallelRangeManager.cs // //[....] // // Implements the algorithm for distributing loop indices to parallel loop workers // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Threading; using System.Diagnostics.Contracts; #pragma warning disable 0420 namespace System.Threading.Tasks { ////// Utility class for allocating structs as heap variables /// internal class Shared{ internal T Value; internal Shared(T value) { this.Value = value; } } /// /// Represents an index range /// internal struct IndexRange { // the From and To values for this range. These do not change. internal long m_nFromInclusive; internal long m_nToExclusive; // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual // value saves us from overflows that can happen due to multiple workers racing to increment this. // All updates to this field need to be interlocked. internal Sharedm_nSharedCurrentIndexOffset; // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. internal int m_bRangeFinished; } /// /// The RangeWorker struct wraps the state needed by a task that services the parallel loop /// internal struct RangeWorker { // reference to the IndexRange array allocated by the range manager internal readonly IndexRange[] m_indexRanges; // index of the current index range that this worker is grabbing chunks from internal int m_nCurrentIndexRange; // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) internal long m_nStep; // increment value is the current amount that this worker will use // to increment the shared index of the range it's working on internal long m_nIncrementValue; // the increment value is doubled each time this worker finds work, and is capped at this value internal readonly long m_nMaxIncrementValue; ////// Initializes a RangeWorker struct /// internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) { m_indexRanges = ranges; m_nCurrentIndexRange = nInitialRange; m_nStep = nStep; m_nIncrementValue = nStep; m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; } ////// Implements the core work search algorithm that will be used for this range worker. /// /// /// Usage pattern is: /// 1) the thread associated with this rangeworker calls FindNewWork /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values /// to execute the sequential loop /// 3) if we return false it means there is no more work left. It's time to quit. /// internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) { // since we iterate over index ranges circularly, we will use the // count of visited ranges as our exit condition int numIndexRangesToVisit = m_indexRanges.Length; do { // local snap to save array access bounds checks in places where we only read fields IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; if (currentRange.m_bRangeFinished == 0) { if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) { Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null); } // this access needs to be on the array slot long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, m_nIncrementValue) - m_nIncrementValue; if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) { // we found work nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; // Check for going past end of range, or wrapping if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) { nToExclusiveLocal = currentRange.m_nToExclusive; } // We will double our unit of increment until it reaches the maximum. if (m_nIncrementValue < m_nMaxIncrementValue) { m_nIncrementValue *= 2; if (m_nIncrementValue > m_nMaxIncrementValue) { m_nIncrementValue = m_nMaxIncrementValue; } } return true; } else { // this index range is completed, mark it so that others can skip it quickly Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); } } // move on to the next index range, in circular order. m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; numIndexRangesToVisit--; } while (numIndexRangesToVisit > 0); // we've visited all index ranges possible => there's no work remaining nFromInclusiveLocal = 0; nToExclusiveLocal = 0; return false; } /// /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. /// internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) { long nFromInclusiveLocal; long nToExclusiveLocal; bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); // convert to 32 bit before returning nFromInclusiveLocal32 = (int)nFromInclusiveLocal; nToExclusiveLocal32 = (int)nToExclusiveLocal; return bRetVal; } } ////// Represents the entire loop operation, keeping track of workers and ranges. /// /// /// The usage pattern is: /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a /// RangeWorker struct to wrap the state it will need to find and execute work, /// and they keep interacting with that struct until the end of the loop internal class RangeManager { internal readonly IndexRange[] m_indexRanges; internal int m_nCurrentIndexRangeToAssign; internal long m_nStep; ////// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges /// internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) { m_nCurrentIndexRangeToAssign = 0; m_nStep = nStep; // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. if (nNumExpectedWorkers == 1) nNumExpectedWorkers = 2; // // calculate the size of each index range // ulong uSpan = (ulong)(nToExclusive - nFromInclusive); ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep // otherwise index range transitions will derail us from nStep if (uRangeSize == 0) { uRangeSize = (ulong) nStep; } // // find the actual number of index ranges we will need // Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue); int nNumRanges = (int)(uSpan / uRangeSize); if (uSpan % uRangeSize != 0) { nNumRanges++; } // Convert to signed so the rest of the logic works. // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. long nRangeSize = (long)uRangeSize; // allocate the array of index ranges m_indexRanges = new IndexRange[nNumRanges]; long nCurrentIndex = nFromInclusive; for (int i = 0; i < nNumRanges; i++) { // the fromInclusive of the new index range is always on nCurrentIndex m_indexRanges[i].m_nFromInclusive = nCurrentIndex; m_indexRanges[i].m_nSharedCurrentIndexOffset = null; m_indexRanges[i].m_bRangeFinished = 0; // now increment it to find the toExclusive value for our range nCurrentIndex += nRangeSize; // detect integer overflow or range overage and snap to nToExclusive if (nCurrentIndex < nCurrentIndex - nRangeSize || nCurrentIndex > nToExclusive) { // this should only happen at the last index Contract.Assert(i == nNumRanges - 1); nCurrentIndex = nToExclusive; } // now that the end point of the new range is calculated, assign it. m_indexRanges[i].m_nToExclusive = nCurrentIndex; } } ////// The function that needs to be called by each new worker thread servicing the parallel loop /// in order to get a RangeWorker struct that wraps the state for finding and executing indices /// internal RangeWorker RegisterNewWorker() { Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0); int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- RoleGroup.cs
- SAPIEngineTypes.cs
- SchemaImporterExtensionsSection.cs
- TypeDescriptionProviderAttribute.cs
- RowSpanVector.cs
- MissingMemberException.cs
- NeutralResourcesLanguageAttribute.cs
- PipelineDeploymentState.cs
- ClientScriptManager.cs
- ImageDrawing.cs
- SimpleType.cs
- DataQuery.cs
- PassportAuthenticationModule.cs
- TagMapInfo.cs
- DataSet.cs
- Viewport3DAutomationPeer.cs
- Font.cs
- ThreadSafeList.cs
- Fx.cs
- ToolStripPanelCell.cs
- InvokeSchedule.cs
- XmlSchemaImport.cs
- NullReferenceException.cs
- BamlStream.cs
- EntitySetBaseCollection.cs
- HybridDictionary.cs
- BlockCollection.cs
- SafeRightsManagementPubHandle.cs
- QuotedPrintableStream.cs
- PersistencePipeline.cs
- ConditionedDesigner.cs
- OdbcConnectionPoolProviderInfo.cs
- IndexedString.cs
- DataGridSortCommandEventArgs.cs
- NoClickablePointException.cs
- EntityCommandExecutionException.cs
- OrthographicCamera.cs
- UIHelper.cs
- ConfigXmlCDataSection.cs
- TextEditorMouse.cs
- HyperLinkStyle.cs
- StatusBarDrawItemEvent.cs
- CodeThrowExceptionStatement.cs
- IpcClientManager.cs
- DocumentPage.cs
- SQLDoubleStorage.cs
- QilVisitor.cs
- DependencyObjectPropertyDescriptor.cs
- HttpWebRequestElement.cs
- DataAdapter.cs
- DbDataRecord.cs
- ExtentCqlBlock.cs
- SubpageParaClient.cs
- TypeConverterHelper.cs
- FamilyTypefaceCollection.cs
- SecurityHeaderLayout.cs
- TPLETWProvider.cs
- Script.cs
- Size.cs
- EntitySet.cs
- TableColumn.cs
- DataGridViewDataConnection.cs
- DefaultAssemblyResolver.cs
- _NetRes.cs
- HttpHeaderCollection.cs
- CubicEase.cs
- ConfigurationElementCollection.cs
- xamlnodes.cs
- CompiledQueryCacheEntry.cs
- Triangle.cs
- SiteMapNodeItemEventArgs.cs
- WindowsRebar.cs
- ElasticEase.cs
- ScriptRef.cs
- PropertyGridCommands.cs
- DataServiceRequestOfT.cs
- TreeNode.cs
- JobDuplex.cs
- PointUtil.cs
- DrawTreeNodeEventArgs.cs
- XmlSchemaAnnotated.cs
- CompressionTransform.cs
- WaveHeader.cs
- DataRelationCollection.cs
- RelationshipConstraintValidator.cs
- TimeSpanSecondsOrInfiniteConverter.cs
- FontFaceLayoutInfo.cs
- ContentElementAutomationPeer.cs
- CodeSubDirectoriesCollection.cs
- unsafeIndexingFilterStream.cs
- HandleInitializationContext.cs
- NullableBoolConverter.cs
- EventEntry.cs
- AssemblyBuilder.cs
- CompareValidator.cs
- TraceHandler.cs
- EncodingInfo.cs
- XslCompiledTransform.cs
- CookieParameter.cs
- SqlDataSourceFilteringEventArgs.cs