Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / System.Runtime.DurableInstancing / System / Runtime / SynchronizedPool.cs / 1305376 / SynchronizedPool.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.Runtime { using System.Collections.Generic; using System.Security; using System.Security.Permissions; using System.Threading; // A simple synchronized pool would simply lock a stack and push/pop on return/take. // // This implementation tries to reduce locking by exploiting the case where an item // is taken and returned by the same thread, which turns out to be common in our // scenarios. // // Initially, all the quota is allocated to a global (non-thread-specific) pool, // which takes locks. As different threads take and return values, we record their IDs, // and if we detect that a thread is taking and returning "enough" on the same thread, // then we decide to "promote" the thread. When a thread is promoted, we decrease the // quota of the global pool by one, and allocate a thread-specific entry for the thread // to store it's value. Once this entry is allocated, the thread can take and return // it's value from that entry without taking any locks. Not only does this avoid // locks, but it affinitizes pooled items to a particular thread. // // There are a couple of additional things worth noting: // // It is possible for a thread that we have reserved an entry for to exit. This means // we will still have a entry allocated for it, but the pooled item stored there // will never be used. After a while, we could end up with a number of these, and // as a result we would begin to exhaust the quota of the overall pool. To mitigate this // case, we throw away the entire per-thread pool, and return all the quota back to // the global pool if we are unable to promote a thread (due to lack of space). Then // the set of active threads will be re-promoted as they take and return items. // // You may notice that the code does not immediately promote a thread, and does not // immediately throw away the entire per-thread pool when it is unable to promote a // thread. Instead, it uses counters (based on the number of calls to the pool) // and a threshold to figure out when to do these operations. In the case where the // pool to misconfigured to have too few items for the workload, this avoids constant // promoting and rebuilding of the per thread entries. // // You may also notice that we do not use interlocked methods when adjusting statistics. // Since the statistics are a heuristic as to how often something is happening, they // do not need to be perfect. // [Fx.Tag.SynchronizationObject(Blocking = false)] class SynchronizedPoolwhere T : class { const int maxPendingEntries = 128; const int maxPromotionFailures = 64; const int maxReturnsBeforePromotion = 64; const int maxThreadItemsPerProcessor = 16; Entry[] entries; GlobalPool globalPool; int maxCount; PendingEntry[] pending; int promotionFailures; public SynchronizedPool(int maxCount) { int threadCount = maxCount; int maxThreadCount = maxThreadItemsPerProcessor + SynchronizedPoolHelper.ProcessorCount; if (threadCount > maxThreadCount) { threadCount = maxThreadCount; } this.maxCount = maxCount; this.entries = new Entry[threadCount]; this.pending = new PendingEntry[4]; this.globalPool = new GlobalPool(maxCount); } object ThisLock { get { return this; } } public void Clear() { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { entries[i].value = null; } globalPool.Clear(); } void HandlePromotionFailure(int thisThreadID) { int newPromotionFailures = this.promotionFailures + 1; if (newPromotionFailures >= maxPromotionFailures) { lock (ThisLock) { this.entries = new Entry[this.entries.Length]; globalPool.MaxCount = maxCount; } PromoteThread(thisThreadID); } else { this.promotionFailures = newPromotionFailures; } } bool PromoteThread(int thisThreadID) { lock (ThisLock) { for (int i = 0; i < this.entries.Length; i++) { int threadID = this.entries[i].threadID; if (threadID == thisThreadID) { return true; } else if (threadID == 0) { globalPool.DecrementMaxCount(); this.entries[i].threadID = thisThreadID; return true; } } } return false; } void RecordReturnToGlobalPool(int thisThreadID) { PendingEntry[] localPending = this.pending; for (int i = 0; i < localPending.Length; i++) { int threadID = localPending[i].threadID; if (threadID == thisThreadID) { int newReturnCount = localPending[i].returnCount + 1; if (newReturnCount >= maxReturnsBeforePromotion) { localPending[i].returnCount = 0; if (!PromoteThread(thisThreadID)) { HandlePromotionFailure(thisThreadID); } } else { localPending[i].returnCount = newReturnCount; } break; } else if (threadID == 0) { break; } } } void RecordTakeFromGlobalPool(int thisThreadID) { PendingEntry[] localPending = this.pending; for (int i = 0; i < localPending.Length; i++) { int threadID = localPending[i].threadID; if (threadID == thisThreadID) { return; } else if (threadID == 0) { lock (localPending) { if (localPending[i].threadID == 0) { localPending[i].threadID = thisThreadID; return; } } } } if (localPending.Length >= maxPendingEntries) { this.pending = new PendingEntry[localPending.Length]; } else { PendingEntry[] newPending = new PendingEntry[localPending.Length * 2]; Array.Copy(localPending, newPending, localPending.Length); this.pending = newPending; } } public bool Return(T value) { int thisThreadID = Thread.CurrentThread.ManagedThreadId; if (thisThreadID == 0) { return false; } if (ReturnToPerThreadPool(thisThreadID, value)) { return true; } return ReturnToGlobalPool(thisThreadID, value); } bool ReturnToPerThreadPool(int thisThreadID, T value) { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { int threadID = entries[i].threadID; if (threadID == thisThreadID) { if (entries[i].value == null) { entries[i].value = value; return true; } else { return false; } } else if (threadID == 0) { break; } } return false; } bool ReturnToGlobalPool(int thisThreadID, T value) { RecordReturnToGlobalPool(thisThreadID); return globalPool.Return(value); } public T Take() { int thisThreadID = Thread.CurrentThread.ManagedThreadId; if (thisThreadID == 0) { return null; } T value = TakeFromPerThreadPool(thisThreadID); if (value != null) { return value; } return TakeFromGlobalPool(thisThreadID); } T TakeFromPerThreadPool(int thisThreadID) { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { int threadID = entries[i].threadID; if (threadID == thisThreadID) { T value = entries[i].value; if (value != null) { entries[i].value = null; return value; } else { return null; } } else if (threadID == 0) { break; } } return null; } T TakeFromGlobalPool(int thisThreadID) { RecordTakeFromGlobalPool(thisThreadID); return globalPool.Take(); } struct Entry { public int threadID; public T value; } struct PendingEntry { public int returnCount; public int threadID; } static class SynchronizedPoolHelper { public static readonly int ProcessorCount = GetProcessorCount(); [Fx.Tag.SecurityNote(Critical = "Asserts in order to get the processor count from the environment", Safe = "This data isn't actually protected so it's ok to leak")] [SecuritySafeCritical] [EnvironmentPermission(SecurityAction.Assert, Read = "NUMBER_OF_PROCESSORS")] static int GetProcessorCount() { return Environment.ProcessorCount; } } [Fx.Tag.SynchronizationObject(Blocking = false)] class GlobalPool { Stack items; int maxCount; public GlobalPool(int maxCount) { this.items = new Stack (); this.maxCount = maxCount; } public int MaxCount { get { return maxCount; } set { lock (ThisLock) { while (items.Count > value) { items.Pop(); } maxCount = value; } } } object ThisLock { get { return this; } } public void DecrementMaxCount() { lock (ThisLock) { if (items.Count == maxCount) { items.Pop(); } maxCount--; } } public T Take() { if (this.items.Count > 0) { lock (ThisLock) { if (this.items.Count > 0) { return this.items.Pop(); } } } return null; } public bool Return(T value) { if (this.items.Count < this.MaxCount) { lock (ThisLock) { if (this.items.Count < this.MaxCount) { this.items.Push(value); return true; } } } return false; } public void Clear() { lock (ThisLock) { this.items.Clear(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.Runtime { using System.Collections.Generic; using System.Security; using System.Security.Permissions; using System.Threading; // A simple synchronized pool would simply lock a stack and push/pop on return/take. // // This implementation tries to reduce locking by exploiting the case where an item // is taken and returned by the same thread, which turns out to be common in our // scenarios. // // Initially, all the quota is allocated to a global (non-thread-specific) pool, // which takes locks. As different threads take and return values, we record their IDs, // and if we detect that a thread is taking and returning "enough" on the same thread, // then we decide to "promote" the thread. When a thread is promoted, we decrease the // quota of the global pool by one, and allocate a thread-specific entry for the thread // to store it's value. Once this entry is allocated, the thread can take and return // it's value from that entry without taking any locks. Not only does this avoid // locks, but it affinitizes pooled items to a particular thread. // // There are a couple of additional things worth noting: // // It is possible for a thread that we have reserved an entry for to exit. This means // we will still have a entry allocated for it, but the pooled item stored there // will never be used. After a while, we could end up with a number of these, and // as a result we would begin to exhaust the quota of the overall pool. To mitigate this // case, we throw away the entire per-thread pool, and return all the quota back to // the global pool if we are unable to promote a thread (due to lack of space). Then // the set of active threads will be re-promoted as they take and return items. // // You may notice that the code does not immediately promote a thread, and does not // immediately throw away the entire per-thread pool when it is unable to promote a // thread. Instead, it uses counters (based on the number of calls to the pool) // and a threshold to figure out when to do these operations. In the case where the // pool to misconfigured to have too few items for the workload, this avoids constant // promoting and rebuilding of the per thread entries. // // You may also notice that we do not use interlocked methods when adjusting statistics. // Since the statistics are a heuristic as to how often something is happening, they // do not need to be perfect. // [Fx.Tag.SynchronizationObject(Blocking = false)] class SynchronizedPool where T : class { const int maxPendingEntries = 128; const int maxPromotionFailures = 64; const int maxReturnsBeforePromotion = 64; const int maxThreadItemsPerProcessor = 16; Entry[] entries; GlobalPool globalPool; int maxCount; PendingEntry[] pending; int promotionFailures; public SynchronizedPool(int maxCount) { int threadCount = maxCount; int maxThreadCount = maxThreadItemsPerProcessor + SynchronizedPoolHelper.ProcessorCount; if (threadCount > maxThreadCount) { threadCount = maxThreadCount; } this.maxCount = maxCount; this.entries = new Entry[threadCount]; this.pending = new PendingEntry[4]; this.globalPool = new GlobalPool(maxCount); } object ThisLock { get { return this; } } public void Clear() { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { entries[i].value = null; } globalPool.Clear(); } void HandlePromotionFailure(int thisThreadID) { int newPromotionFailures = this.promotionFailures + 1; if (newPromotionFailures >= maxPromotionFailures) { lock (ThisLock) { this.entries = new Entry[this.entries.Length]; globalPool.MaxCount = maxCount; } PromoteThread(thisThreadID); } else { this.promotionFailures = newPromotionFailures; } } bool PromoteThread(int thisThreadID) { lock (ThisLock) { for (int i = 0; i < this.entries.Length; i++) { int threadID = this.entries[i].threadID; if (threadID == thisThreadID) { return true; } else if (threadID == 0) { globalPool.DecrementMaxCount(); this.entries[i].threadID = thisThreadID; return true; } } } return false; } void RecordReturnToGlobalPool(int thisThreadID) { PendingEntry[] localPending = this.pending; for (int i = 0; i < localPending.Length; i++) { int threadID = localPending[i].threadID; if (threadID == thisThreadID) { int newReturnCount = localPending[i].returnCount + 1; if (newReturnCount >= maxReturnsBeforePromotion) { localPending[i].returnCount = 0; if (!PromoteThread(thisThreadID)) { HandlePromotionFailure(thisThreadID); } } else { localPending[i].returnCount = newReturnCount; } break; } else if (threadID == 0) { break; } } } void RecordTakeFromGlobalPool(int thisThreadID) { PendingEntry[] localPending = this.pending; for (int i = 0; i < localPending.Length; i++) { int threadID = localPending[i].threadID; if (threadID == thisThreadID) { return; } else if (threadID == 0) { lock (localPending) { if (localPending[i].threadID == 0) { localPending[i].threadID = thisThreadID; return; } } } } if (localPending.Length >= maxPendingEntries) { this.pending = new PendingEntry[localPending.Length]; } else { PendingEntry[] newPending = new PendingEntry[localPending.Length * 2]; Array.Copy(localPending, newPending, localPending.Length); this.pending = newPending; } } public bool Return(T value) { int thisThreadID = Thread.CurrentThread.ManagedThreadId; if (thisThreadID == 0) { return false; } if (ReturnToPerThreadPool(thisThreadID, value)) { return true; } return ReturnToGlobalPool(thisThreadID, value); } bool ReturnToPerThreadPool(int thisThreadID, T value) { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { int threadID = entries[i].threadID; if (threadID == thisThreadID) { if (entries[i].value == null) { entries[i].value = value; return true; } else { return false; } } else if (threadID == 0) { break; } } return false; } bool ReturnToGlobalPool(int thisThreadID, T value) { RecordReturnToGlobalPool(thisThreadID); return globalPool.Return(value); } public T Take() { int thisThreadID = Thread.CurrentThread.ManagedThreadId; if (thisThreadID == 0) { return null; } T value = TakeFromPerThreadPool(thisThreadID); if (value != null) { return value; } return TakeFromGlobalPool(thisThreadID); } T TakeFromPerThreadPool(int thisThreadID) { Entry[] entries = this.entries; for (int i = 0; i < entries.Length; i++) { int threadID = entries[i].threadID; if (threadID == thisThreadID) { T value = entries[i].value; if (value != null) { entries[i].value = null; return value; } else { return null; } } else if (threadID == 0) { break; } } return null; } T TakeFromGlobalPool(int thisThreadID) { RecordTakeFromGlobalPool(thisThreadID); return globalPool.Take(); } struct Entry { public int threadID; public T value; } struct PendingEntry { public int returnCount; public int threadID; } static class SynchronizedPoolHelper { public static readonly int ProcessorCount = GetProcessorCount(); [Fx.Tag.SecurityNote(Critical = "Asserts in order to get the processor count from the environment", Safe = "This data isn't actually protected so it's ok to leak")] [SecuritySafeCritical] [EnvironmentPermission(SecurityAction.Assert, Read = "NUMBER_OF_PROCESSORS")] static int GetProcessorCount() { return Environment.ProcessorCount; } } [Fx.Tag.SynchronizationObject(Blocking = false)] class GlobalPool { Stack items; int maxCount; public GlobalPool(int maxCount) { this.items = new Stack (); this.maxCount = maxCount; } public int MaxCount { get { return maxCount; } set { lock (ThisLock) { while (items.Count > value) { items.Pop(); } maxCount = value; } } } object ThisLock { get { return this; } } public void DecrementMaxCount() { lock (ThisLock) { if (items.Count == maxCount) { items.Pop(); } maxCount--; } } public T Take() { if (this.items.Count > 0) { lock (ThisLock) { if (this.items.Count > 0) { return this.items.Pop(); } } } return null; } public bool Return(T value) { if (this.items.Count < this.MaxCount) { lock (ThisLock) { if (this.items.Count < this.MaxCount) { this.items.Push(value); return true; } } } return false; } public void Clear() { lock (ThisLock) { this.items.Clear(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- MgmtResManager.cs
- FontFamilyConverter.cs
- TripleDESCryptoServiceProvider.cs
- HtmlLink.cs
- ItemsControlAutomationPeer.cs
- COM2PropertyDescriptor.cs
- TextRangeAdaptor.cs
- WindowsBrush.cs
- ServiceCredentialsSecurityTokenManager.cs
- Matrix3DStack.cs
- XmlSchemaComplexType.cs
- Matrix.cs
- Vector3DCollectionValueSerializer.cs
- SaveFileDialog.cs
- SqlDataAdapter.cs
- QilInvokeLateBound.cs
- HtmlControlPersistable.cs
- MachineKey.cs
- DynamicContractTypeBuilder.cs
- ItemsControlAutomationPeer.cs
- LoopExpression.cs
- TimelineGroup.cs
- FileDialogCustomPlace.cs
- DesignTimeTemplateParser.cs
- NativeMethods.cs
- CodeEventReferenceExpression.cs
- EmptyElement.cs
- StringBuilder.cs
- TableLayoutPanel.cs
- PrintDocument.cs
- ListControl.cs
- DocumentPage.cs
- EndpointConfigContainer.cs
- DayRenderEvent.cs
- DataGridRowAutomationPeer.cs
- CreateParams.cs
- PeerPresenceInfo.cs
- OracleRowUpdatedEventArgs.cs
- TransformPattern.cs
- DoubleKeyFrameCollection.cs
- RawKeyboardInputReport.cs
- MediaSystem.cs
- ProtectedConfiguration.cs
- MarshalDirectiveException.cs
- DateTimeFormat.cs
- TypeUnloadedException.cs
- CodeTypeDeclarationCollection.cs
- AnimationException.cs
- DataGridViewRowsRemovedEventArgs.cs
- XmlAnyElementAttribute.cs
- WebPartCatalogAddVerb.cs
- TdsParameterSetter.cs
- TrustLevel.cs
- ConfigXmlElement.cs
- AdapterDictionary.cs
- PropertyCollection.cs
- HtmlFormWrapper.cs
- DataGridViewCellStyleChangedEventArgs.cs
- ZipPackagePart.cs
- TemplatedMailWebEventProvider.cs
- TreeNodeStyle.cs
- Encoder.cs
- OletxVolatileEnlistment.cs
- WebColorConverter.cs
- TextDecorations.cs
- Label.cs
- ControlCommandSet.cs
- ZipIOLocalFileDataDescriptor.cs
- mediaclock.cs
- UrlAuthorizationModule.cs
- XmlSchemaResource.cs
- DrawingGroupDrawingContext.cs
- CodeTypeDeclarationCollection.cs
- TwoPhaseCommit.cs
- WebHostedComPlusServiceHost.cs
- Sorting.cs
- EdmFunctionAttribute.cs
- EntityStoreSchemaFilterEntry.cs
- CompiledQuery.cs
- TransformGroup.cs
- ColorMatrix.cs
- SqlBulkCopy.cs
- RelationshipWrapper.cs
- OptimalBreakSession.cs
- ExpressionsCollectionConverter.cs
- XPathDocument.cs
- AsyncMethodInvoker.cs
- HtmlSelect.cs
- IncrementalReadDecoders.cs
- TdsEnums.cs
- OleDbDataReader.cs
- ToReply.cs
- SessionIDManager.cs
- DesignTableCollection.cs
- ConfigurationSettings.cs
- embossbitmapeffect.cs
- Parameter.cs
- PanelStyle.cs
- ControlBuilderAttribute.cs
- TypeToken.cs