Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Sys / System / Collections / Concurrent / ConcurrentBag.cs / 1305376 / ConcurrentBag.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ConcurrentBag.cs // //[....] // // //An unordered collection that allows duplicates and that provides add and get operations. // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections; using System.Collections.Generic; using System.Collections.Concurrent; using System.Runtime.Serialization; using System.Runtime.InteropServices; using System.Diagnostics; using System.Security.Permissions; using System.Threading; using System.Diagnostics.CodeAnalysis; namespace System.Collections.Concurrent { ////// Represents an thread-safe, unordered collection of objects. /// ///Specifies the type of elements in the bag. ////// [Serializable] [ComVisible(false)] [DebuggerTypeProxy(typeof(SystemThreadingCollection_IProducerConsumerCollectionDebugView<>))] [DebuggerDisplay("Count = {Count}")] [HostProtection(Synchronization = true, ExternalThreading = true)] public class ConcurrentBag/// Bags are useful for storing objects when ordering doesn't matter, and unlike sets, bags support /// duplicates. ///is a thread-safe bag implementation, optimized for /// scenarios where the same thread will be both producing and consuming data stored in the bag. /// /// ///accepts null reference (Nothing in Visual Basic) as a valid /// value for reference types. /// /// All public and protected members of ///are thread-safe and may be used /// concurrently from multiple threads. /// : IProducerConsumerCollection { // ThreadLocalList object that contains the data per thread [NonSerialized] ThreadLocal m_locals; // This head and tail pointers points to the first and last local lists, to allow enumeration on the thread locals objects [NonSerialized] volatile ThreadLocalList m_headList, m_tailList; // A global lock object, used in two cases: // 1- To maintain the m_tailList pointer for each new list addition process ( first time a thread called Add ) // 2- To freeze the bag in GetEnumerator, CopyTo, ToArray and Count members [NonSerialized] object m_globalListsLock; // A flag used to tell the operations thread that it must synchronize the operation, this flag is set/unset within // m_globalListsLock lock [NonSerialized] bool m_needSync; // Used for custom serialization. private T[] m_serializationArray; /// /// Initializes a new instance of the public ConcurrentBag() { Initialize(null); } ////// class. /// /// Initializes a new instance of the /// The collection whose elements are copied to the new/// class that contains elements copied from the specified collection. /// . /// public ConcurrentBag(IEnumerable is a null reference /// (Nothing in Visual Basic). collection) { if (collection == null) { throw new ArgumentNullException("collection", SR.GetString(SR.ConcurrentBag_Ctor_ArgumentNullException)); } Initialize(collection); } /// /// Local helper function to initalize a new bag object /// /// An enumeration containing items with which to initialize this bag. private void Initialize(IEnumerablecollection) { m_locals = new ThreadLocal (); m_globalListsLock = new object(); // Copy the collection to the bag if (collection != null) { ThreadLocalList list = GetThreadList(true); foreach (T item in collection) { AddInternal(list, item); } } } /// /// Adds an object to the /// The object to be added to the ///. /// . The value can be a null reference /// (Nothing in Visual Basic) for reference types. public void Add(T item) { // Get the local list for that thread, create a new list if this thread doesn't exist //(first time to call add) ThreadLocalList list = GetThreadList(true); AddInternal(list, item); } /// /// /// /// private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 //Synchronization cases: // if the list count is less than two to avoid conflict with any stealing thread // if m_needSync is set, this means there is a thread that needs to freeze the bag if (list.Count < 2 || m_needSync) { // reset it back to zero to avoid deadlock with stealing thread list.m_currentOp = (int)ListOperation.None; Monitor.Enter(list, ref lockTaken); } list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } } ////// Attempts to add an object to the /// The object to be added to the ///. /// . The value can be a null reference /// (Nothing in Visual Basic) for reference types. /// Always returns true bool IProducerConsumerCollection.TryAdd(T item) { Add(item); return true; } /// /// Attempts to remove and return an object from the /// When this method returns,. /// contains the object /// removed from the or the default value /// of if the operation failed. /// true if an object was removed successfully; otherwise, false. public bool TryTake(out T result) { return TryTakeOrPeek(out result, true); } ////// Attempts to return an object from the /// When this method returns,/// without removing it. /// contains an object from /// the or the default value of /// if the operation failed. /// true if and object was returned successfully; otherwise, false. public bool TryPeek(out T result) { return TryTakeOrPeek(out result, false); } ////// Local helper function to Take or Peek an item from the bag /// /// To receive the item retrieved from the bag /// True means Take operation, false means Peek operation ///True if succeeded, false otherwise private bool TryTakeOrPeek(out T result, bool take) { // Get the local list for that thread, return null if the thread doesn't exit //(this thread never add before) ThreadLocalList list = GetThreadList(false); if (list == null || list.Count == 0) { return Steal(out result, take); } bool lockTaken = false; try { if (take) // Take operation { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Take); #pragma warning restore 0420 //Synchronization cases: // if the list count is less than or equal two to avoid conflict with any stealing thread // if m_needSync is set, this means there is a thread that needs to freeze the bag if (list.Count <= 2 || m_needSync) { // reset it back to zero to avoid deadlock with stealing thread list.m_currentOp = (int)ListOperation.None; Monitor.Enter(list, ref lockTaken); // Double check the count and steal if it became empty if (list.Count == 0) { // Release the lock before stealing if (lockTaken) { try { } finally { lockTaken = false; // reset lockTaken to avoid calling Monitor.Exit again in the finally block Monitor.Exit(list); } } return Steal(out result, true); } } list.Remove(out result); } else { if (!list.Peek(out result)) { return Steal(out result, false); } } } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } return true; } ////// Local helper function to retrieve a thread local list by a thread object /// /// Create a new list if the thread does ot exist ///The local list object private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // Acquire the lock to update the m_tailList pointer lock (m_globalListsLock) { if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { list = GetUnownedList(); if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } ////// Try to reuse an unowned list if exist /// unowned lists are the lists that their owner threads are aborted or terminated /// this is workaround to avoid memory leaks. /// ///The list object, null if all lists are owned private ThreadLocalList GetUnownedList() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } ////// Local helper method to steal an item from any other non empty thread /// It enumerate all other threads in two passes first pass acquire the lock with TryEnter if succeeded /// it steals the item, otherwise it enumerate them again in 2nd pass and acquire the lock using Enter /// /// To receive the item retrieved from the bag /// Whether to remove or peek. ///True if succeeded, false otherwise. private bool Steal(out T result, bool take) { #if !FEATURE_PAL // PAL doesn't support eventing if (take) CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryTakeSteals(); else CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryPeekSteals(); #endif bool loop; do { loop = false; ListversionsList = new List (); // save the lists version ThreadLocalList currentList = m_headList; while (currentList != null) { versionsList.Add(currentList.m_version); if (currentList.m_head != null && TrySteal(currentList, out result, take)) { return true; } currentList = currentList.m_nextList; } // verify versioning, if other items are added to this list since we last visit it, we should retry currentList = m_headList; foreach (int version in versionsList) { if (version != currentList.m_version) //oops state changed { loop = true; if (currentList.m_head != null && TrySteal(currentList, out result, take)) return true; } currentList = currentList.m_nextList; } } while (loop); result = default(T); return false; } /// /// local helper function tries to steal an item from given local list /// private bool TrySteal(ThreadLocalList list, out T result, bool take) { lock (list) { if (CanSteal(list)) { list.Steal(out result, take); return true; } result = default(T); return false; } } ////// Local helper function to check the list if it became empty after acquiring the lock /// and wait if there is unsynchronized Add/Take operation in the list to be done /// /// The list to steal ///True if can steal, false otherwise private bool CanSteal(ThreadLocalList list) { if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } if (list.Count > 0) { return true; } return false; } ////// Copies the /// The one-dimensionalelements to an existing /// one-dimensional Array , starting at the specified array /// index. ///Array that is the /// destination of the elements copied from the ///. The Array must have zero-based indexing. /// The zero-based index inat which copying /// begins. /// /// is a null reference (Nothing in /// Visual Basic). /// is less than /// zero. public void CopyTo(T[] array, int index) { if (array == null) { throw new ArgumentNullException("array", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentNullException)); } if (index < 0) { throw new ArgumentOutOfRangeException ("index", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentOutOfRangeException)); } // Short path if the bag is empty if (m_headList == null) return; bool lockTaken = false; try { FreezeBag(ref lockTaken); ToList().CopyTo(array, index); } finally { UnfreezeBag(lockTaken); } } /// is equal to or greater than the /// length of the /// -or- the number of elements in the source is greater than the available space from /// to the end of the destination . /// Copies the elements of the /// The one-dimensionalto an , starting at a particular /// index. /// Array that is the /// destination of the elements copied from the ///. The Array must have zero-based indexing. /// The zero-based index inat which copying /// begins. /// /// is a null reference (Nothing in /// Visual Basic). /// is less than /// zero. /// void ICollection.CopyTo(Array array, int index) { if (array == null) { throw new ArgumentNullException("array", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentNullException)); } bool lockTaken = false; try { FreezeBag(ref lockTaken); ((ICollection)ToList()).CopyTo(array, index); } finally { UnfreezeBag(lockTaken); } } ///is multidimensional. -or- /// does not have zero-based indexing. -or- /// is equal to or greater than the length of the /// -or- The number of elements in the source is /// greater than the available space from to the end of the destination /// . -or- The type of the source cannot be cast automatically to the type of the /// destination . /// /// Copies the ///elements to a new array. /// A new array containing a snapshot of elements copied from the public T[] ToArray() { // Short path if the bag is empty if (m_headList == null) return new T[0]; bool lockTaken = false; try { FreezeBag(ref lockTaken); return ToList().ToArray(); } finally { UnfreezeBag(lockTaken); } } ///. /// Returns an enumerator that iterates through the ///. /// An enumerator for the contents of the ///. /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// public IEnumeratorwas called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// GetEnumerator() { // CopyTo the items to a new array and enumerate the array T[] array = ToArray(); return ((IEnumerable )array).GetEnumerator(); } /// /// Returns an enumerator that iterates through the ///. /// An enumerator for the contents of the ///. /// The items enumerated represent a moment-in-time snapshot of the contents /// of the bag. It does not reflect any update to the collection after /// IEnumerator IEnumerable.GetEnumerator() { return ((ConcurrentBagwas called. /// )this).GetEnumerator(); } /// /// Get the data array to be serialized /// [OnSerializing] private void OnSerializing(StreamingContext context) { // save the data into the serialization array to be saved m_serializationArray = ToArray(); } ////// Construct the stack from a previously seiralized one /// [OnDeserialized] private void OnDeserialized(StreamingContext context) { m_locals = new ThreadLocal(); m_globalListsLock = new object(); ThreadLocalList list = GetThreadList(true); foreach (T item in m_serializationArray) { AddInternal(list, item); } m_headList = list; m_tailList = list; m_serializationArray = null; } /// /// Gets the number of elements contained in the ///. /// The number of elements contained in the ///. /// The count returned represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// public int Count { get { // Short path if the bag is empty if (m_headList == null) return 0; bool lockTaken = false; try { FreezeBag(ref lockTaken); return GetCountInternal(); } finally { UnfreezeBag(lockTaken); } } } ///was called. /// /// Gets a value that indicates whether the ///is empty. /// true if the public bool IsEmpty { get { if (m_headList == null) return true; bool lockTaken = false; try { FreezeBag(ref lockTaken); ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_head != null) //at least this list is not empty, we return false { return false; } currentList = currentList.m_nextList; } return true; } finally { UnfreezeBag(lockTaken); } } } ///is empty; otherwise, false. /// Gets a value indicating whether access to the ///is /// synchronized with the SyncRoot. /// true if access to the bool ICollection.IsSynchronized { get { return false; } } ///is synchronized /// with the SyncRoot; otherwise, false. For , this property always /// returns false. /// Gets an object that can be used to synchronize access to the ///. This property is not supported. /// The SyncRoot property is not supported. object ICollection.SyncRoot { get { throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); } } #region Freeze bag helper methods ////// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// /// Retrieve the lock taken result for the global lock, to be passed to Unfreeze method private void FreezeBag(ref bool lockTaken) { // global lock to be safe against multi threads calls count and corrupt m_needSync Monitor.Enter(m_globalListsLock, ref lockTaken); // This will force any future add/take operation to be synchronized m_needSync = true; //Acquire all local lists locks AcquireAllLocks(); // Wait for all un-synchronized operation to be done WaitAllOperations(); } ////// Local helper method to unfreeze the bag from a frozen state /// /// The lock taken result from the Freeze method private void UnfreezeBag(bool lockTaken) { ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(m_globalListsLock); } } ////// local helper method to acquire all local lists locks /// private void AcquireAllLocks() { bool lockTaken = false; ThreadLocalList currentList = m_headList; while (currentList != null) { // Try/Finally bllock to avoid thread aport between acquiring the lock and setting the taken flag try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } ////// Local helper method to release all local lists locks /// private void ReleaseAllLocks() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } ////// Local helper function to wait all unsynchronized operations /// private void WaitAllOperations() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } ////// Local helper function to get the bag count, the caller should call it from Freeze/Unfreeze block /// ///The current bag count private int GetCountInternal() { int count = 0; ThreadLocalList currentList = m_headList; while (currentList != null) { checked { count += currentList.Count; } currentList = currentList.m_nextList; } return count; } ////// Local helper function to return the bag item in a list, this is mainly used by CopyTo and ToArray /// This is not thread safe, should be called in Freeze/UnFreeze bag block /// ///List the contains the bag items private ListToList() { List list = new List (); ThreadLocalList currentList = m_headList; while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } #endregion #region Inner Classes /// /// A class that represents a node in the lock thread list /// [Serializable] internal class Node { public Node(T value) { m_value = value; } public T m_value; public Node m_next; public Node m_prev; } ////// A class that represents the lock thread list /// internal class ThreadLocalList { // Tead node in the list, null means the list is empty internal Node m_head; // Tail node for the list private Node m_tail; // The current list operation internal volatile int m_currentOp; // The list count from the Add/Take prespective private int m_count; // The stealing count internal int m_stealCount; // Next list in the dictionary values internal ThreadLocalList m_nextList; // Set if the locl lock is taken internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // the version of the list, incremented only when the list changed from empty to non empty state internal volatile int m_version; ////// ThreadLocalList constructor /// /// The owner thread for this list internal ThreadLocalList(Thread ownerThread) { m_ownerThread = ownerThread; } ////// Add new item to head of the list /// /// The item to add. /// Whether to update the count. internal void Add(T item, bool updateCount) { checked { m_count++; } Node node = new Node(item); if (m_head == null) { Debug.Assert(m_tail == null); m_head = node; m_tail = node; m_version++; // changing from empty state to non empty state } else { node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updateCount) // update the count to avoid overflow if this add is synchronized { m_count = m_count - m_stealCount; m_stealCount = 0; } } ////// Remove an item from the head of the list /// /// The removed item internal void Remove(out T result) { Debug.Assert(m_head != null); Node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } ////// Peek an item from the head of the list /// /// the peeked item ///True if succeeded, false otherwise internal bool Peek(out T result) { Node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(T); return false; } ////// Steal an item from the tail of the list /// /// the removed item /// remove or peek flag internal void Steal(out T result, bool remove) { Node tail = m_tail; Debug.Assert(tail != null); if (remove) // Take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // Increment the steal count m_stealCount++; } result = tail.m_value; } ////// Gets the total list count, it's not thread safe, may provide incorrect count if it is called concurrently /// internal int Count { get { return m_count - m_stealCount; } } } ////// List operations /// internal enum ListOperation { None, Add, Take }; #endregion } #region Internal Types ////// A simple class for the debugger view window /// internal sealed class SystemThreadingCollection_IProducerConsumerCollectionDebugView{ IProducerConsumerCollection m_collection; public SystemThreadingCollection_IProducerConsumerCollectionDebugView(IProducerConsumerCollection collection) { if (collection == null) { throw new ArgumentNullException("collection"); } m_collection = collection; } public T[] Items { get { return m_collection.ToArray(); } } } #endregion } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ConcurrentBag.cs // // [....] // // //An unordered collection that allows duplicates and that provides add and get operations. // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections; using System.Collections.Generic; using System.Collections.Concurrent; using System.Runtime.Serialization; using System.Runtime.InteropServices; using System.Diagnostics; using System.Security.Permissions; using System.Threading; using System.Diagnostics.CodeAnalysis; namespace System.Collections.Concurrent { ////// Represents an thread-safe, unordered collection of objects. /// ///Specifies the type of elements in the bag. ////// [Serializable] [ComVisible(false)] [DebuggerTypeProxy(typeof(SystemThreadingCollection_IProducerConsumerCollectionDebugView<>))] [DebuggerDisplay("Count = {Count}")] [HostProtection(Synchronization = true, ExternalThreading = true)] public class ConcurrentBag/// Bags are useful for storing objects when ordering doesn't matter, and unlike sets, bags support /// duplicates. ///is a thread-safe bag implementation, optimized for /// scenarios where the same thread will be both producing and consuming data stored in the bag. /// /// ///accepts null reference (Nothing in Visual Basic) as a valid /// value for reference types. /// /// All public and protected members of ///are thread-safe and may be used /// concurrently from multiple threads. /// : IProducerConsumerCollection { // ThreadLocalList object that contains the data per thread [NonSerialized] ThreadLocal m_locals; // This head and tail pointers points to the first and last local lists, to allow enumeration on the thread locals objects [NonSerialized] volatile ThreadLocalList m_headList, m_tailList; // A global lock object, used in two cases: // 1- To maintain the m_tailList pointer for each new list addition process ( first time a thread called Add ) // 2- To freeze the bag in GetEnumerator, CopyTo, ToArray and Count members [NonSerialized] object m_globalListsLock; // A flag used to tell the operations thread that it must synchronize the operation, this flag is set/unset within // m_globalListsLock lock [NonSerialized] bool m_needSync; // Used for custom serialization. private T[] m_serializationArray; /// /// Initializes a new instance of the public ConcurrentBag() { Initialize(null); } ////// class. /// /// Initializes a new instance of the /// The collection whose elements are copied to the new/// class that contains elements copied from the specified collection. /// . /// public ConcurrentBag(IEnumerable is a null reference /// (Nothing in Visual Basic). collection) { if (collection == null) { throw new ArgumentNullException("collection", SR.GetString(SR.ConcurrentBag_Ctor_ArgumentNullException)); } Initialize(collection); } /// /// Local helper function to initalize a new bag object /// /// An enumeration containing items with which to initialize this bag. private void Initialize(IEnumerablecollection) { m_locals = new ThreadLocal (); m_globalListsLock = new object(); // Copy the collection to the bag if (collection != null) { ThreadLocalList list = GetThreadList(true); foreach (T item in collection) { AddInternal(list, item); } } } /// /// Adds an object to the /// The object to be added to the ///. /// . The value can be a null reference /// (Nothing in Visual Basic) for reference types. public void Add(T item) { // Get the local list for that thread, create a new list if this thread doesn't exist //(first time to call add) ThreadLocalList list = GetThreadList(true); AddInternal(list, item); } /// /// /// /// private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 //Synchronization cases: // if the list count is less than two to avoid conflict with any stealing thread // if m_needSync is set, this means there is a thread that needs to freeze the bag if (list.Count < 2 || m_needSync) { // reset it back to zero to avoid deadlock with stealing thread list.m_currentOp = (int)ListOperation.None; Monitor.Enter(list, ref lockTaken); } list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } } ////// Attempts to add an object to the /// The object to be added to the ///. /// . The value can be a null reference /// (Nothing in Visual Basic) for reference types. /// Always returns true bool IProducerConsumerCollection.TryAdd(T item) { Add(item); return true; } /// /// Attempts to remove and return an object from the /// When this method returns,. /// contains the object /// removed from the or the default value /// of if the operation failed. /// true if an object was removed successfully; otherwise, false. public bool TryTake(out T result) { return TryTakeOrPeek(out result, true); } ////// Attempts to return an object from the /// When this method returns,/// without removing it. /// contains an object from /// the or the default value of /// if the operation failed. /// true if and object was returned successfully; otherwise, false. public bool TryPeek(out T result) { return TryTakeOrPeek(out result, false); } ////// Local helper function to Take or Peek an item from the bag /// /// To receive the item retrieved from the bag /// True means Take operation, false means Peek operation ///True if succeeded, false otherwise private bool TryTakeOrPeek(out T result, bool take) { // Get the local list for that thread, return null if the thread doesn't exit //(this thread never add before) ThreadLocalList list = GetThreadList(false); if (list == null || list.Count == 0) { return Steal(out result, take); } bool lockTaken = false; try { if (take) // Take operation { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Take); #pragma warning restore 0420 //Synchronization cases: // if the list count is less than or equal two to avoid conflict with any stealing thread // if m_needSync is set, this means there is a thread that needs to freeze the bag if (list.Count <= 2 || m_needSync) { // reset it back to zero to avoid deadlock with stealing thread list.m_currentOp = (int)ListOperation.None; Monitor.Enter(list, ref lockTaken); // Double check the count and steal if it became empty if (list.Count == 0) { // Release the lock before stealing if (lockTaken) { try { } finally { lockTaken = false; // reset lockTaken to avoid calling Monitor.Exit again in the finally block Monitor.Exit(list); } } return Steal(out result, true); } } list.Remove(out result); } else { if (!list.Peek(out result)) { return Steal(out result, false); } } } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } return true; } ////// Local helper function to retrieve a thread local list by a thread object /// /// Create a new list if the thread does ot exist ///The local list object private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // Acquire the lock to update the m_tailList pointer lock (m_globalListsLock) { if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { list = GetUnownedList(); if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } ////// Try to reuse an unowned list if exist /// unowned lists are the lists that their owner threads are aborted or terminated /// this is workaround to avoid memory leaks. /// ///The list object, null if all lists are owned private ThreadLocalList GetUnownedList() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } ////// Local helper method to steal an item from any other non empty thread /// It enumerate all other threads in two passes first pass acquire the lock with TryEnter if succeeded /// it steals the item, otherwise it enumerate them again in 2nd pass and acquire the lock using Enter /// /// To receive the item retrieved from the bag /// Whether to remove or peek. ///True if succeeded, false otherwise. private bool Steal(out T result, bool take) { #if !FEATURE_PAL // PAL doesn't support eventing if (take) CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryTakeSteals(); else CDSCollectionETWBCLProvider.Log.ConcurrentBag_TryPeekSteals(); #endif bool loop; do { loop = false; ListversionsList = new List (); // save the lists version ThreadLocalList currentList = m_headList; while (currentList != null) { versionsList.Add(currentList.m_version); if (currentList.m_head != null && TrySteal(currentList, out result, take)) { return true; } currentList = currentList.m_nextList; } // verify versioning, if other items are added to this list since we last visit it, we should retry currentList = m_headList; foreach (int version in versionsList) { if (version != currentList.m_version) //oops state changed { loop = true; if (currentList.m_head != null && TrySteal(currentList, out result, take)) return true; } currentList = currentList.m_nextList; } } while (loop); result = default(T); return false; } /// /// local helper function tries to steal an item from given local list /// private bool TrySteal(ThreadLocalList list, out T result, bool take) { lock (list) { if (CanSteal(list)) { list.Steal(out result, take); return true; } result = default(T); return false; } } ////// Local helper function to check the list if it became empty after acquiring the lock /// and wait if there is unsynchronized Add/Take operation in the list to be done /// /// The list to steal ///True if can steal, false otherwise private bool CanSteal(ThreadLocalList list) { if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } if (list.Count > 0) { return true; } return false; } ////// Copies the /// The one-dimensionalelements to an existing /// one-dimensional Array , starting at the specified array /// index. ///Array that is the /// destination of the elements copied from the ///. The Array must have zero-based indexing. /// The zero-based index inat which copying /// begins. /// /// is a null reference (Nothing in /// Visual Basic). /// is less than /// zero. public void CopyTo(T[] array, int index) { if (array == null) { throw new ArgumentNullException("array", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentNullException)); } if (index < 0) { throw new ArgumentOutOfRangeException ("index", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentOutOfRangeException)); } // Short path if the bag is empty if (m_headList == null) return; bool lockTaken = false; try { FreezeBag(ref lockTaken); ToList().CopyTo(array, index); } finally { UnfreezeBag(lockTaken); } } /// is equal to or greater than the /// length of the /// -or- the number of elements in the source is greater than the available space from /// to the end of the destination . /// Copies the elements of the /// The one-dimensionalto an , starting at a particular /// index. /// Array that is the /// destination of the elements copied from the ///. The Array must have zero-based indexing. /// The zero-based index inat which copying /// begins. /// /// is a null reference (Nothing in /// Visual Basic). /// is less than /// zero. /// void ICollection.CopyTo(Array array, int index) { if (array == null) { throw new ArgumentNullException("array", SR.GetString(SR.ConcurrentBag_CopyTo_ArgumentNullException)); } bool lockTaken = false; try { FreezeBag(ref lockTaken); ((ICollection)ToList()).CopyTo(array, index); } finally { UnfreezeBag(lockTaken); } } ///is multidimensional. -or- /// does not have zero-based indexing. -or- /// is equal to or greater than the length of the /// -or- The number of elements in the source is /// greater than the available space from to the end of the destination /// . -or- The type of the source cannot be cast automatically to the type of the /// destination . /// /// Copies the ///elements to a new array. /// A new array containing a snapshot of elements copied from the public T[] ToArray() { // Short path if the bag is empty if (m_headList == null) return new T[0]; bool lockTaken = false; try { FreezeBag(ref lockTaken); return ToList().ToArray(); } finally { UnfreezeBag(lockTaken); } } ///. /// Returns an enumerator that iterates through the ///. /// An enumerator for the contents of the ///. /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// public IEnumeratorwas called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// GetEnumerator() { // CopyTo the items to a new array and enumerate the array T[] array = ToArray(); return ((IEnumerable )array).GetEnumerator(); } /// /// Returns an enumerator that iterates through the ///. /// An enumerator for the contents of the ///. /// The items enumerated represent a moment-in-time snapshot of the contents /// of the bag. It does not reflect any update to the collection after /// IEnumerator IEnumerable.GetEnumerator() { return ((ConcurrentBagwas called. /// )this).GetEnumerator(); } /// /// Get the data array to be serialized /// [OnSerializing] private void OnSerializing(StreamingContext context) { // save the data into the serialization array to be saved m_serializationArray = ToArray(); } ////// Construct the stack from a previously seiralized one /// [OnDeserialized] private void OnDeserialized(StreamingContext context) { m_locals = new ThreadLocal(); m_globalListsLock = new object(); ThreadLocalList list = GetThreadList(true); foreach (T item in m_serializationArray) { AddInternal(list, item); } m_headList = list; m_tailList = list; m_serializationArray = null; } /// /// Gets the number of elements contained in the ///. /// The number of elements contained in the ///. /// The count returned represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// public int Count { get { // Short path if the bag is empty if (m_headList == null) return 0; bool lockTaken = false; try { FreezeBag(ref lockTaken); return GetCountInternal(); } finally { UnfreezeBag(lockTaken); } } } ///was called. /// /// Gets a value that indicates whether the ///is empty. /// true if the public bool IsEmpty { get { if (m_headList == null) return true; bool lockTaken = false; try { FreezeBag(ref lockTaken); ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_head != null) //at least this list is not empty, we return false { return false; } currentList = currentList.m_nextList; } return true; } finally { UnfreezeBag(lockTaken); } } } ///is empty; otherwise, false. /// Gets a value indicating whether access to the ///is /// synchronized with the SyncRoot. /// true if access to the bool ICollection.IsSynchronized { get { return false; } } ///is synchronized /// with the SyncRoot; otherwise, false. For , this property always /// returns false. /// Gets an object that can be used to synchronize access to the ///. This property is not supported. /// The SyncRoot property is not supported. object ICollection.SyncRoot { get { throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); } } #region Freeze bag helper methods ////// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// /// Retrieve the lock taken result for the global lock, to be passed to Unfreeze method private void FreezeBag(ref bool lockTaken) { // global lock to be safe against multi threads calls count and corrupt m_needSync Monitor.Enter(m_globalListsLock, ref lockTaken); // This will force any future add/take operation to be synchronized m_needSync = true; //Acquire all local lists locks AcquireAllLocks(); // Wait for all un-synchronized operation to be done WaitAllOperations(); } ////// Local helper method to unfreeze the bag from a frozen state /// /// The lock taken result from the Freeze method private void UnfreezeBag(bool lockTaken) { ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(m_globalListsLock); } } ////// local helper method to acquire all local lists locks /// private void AcquireAllLocks() { bool lockTaken = false; ThreadLocalList currentList = m_headList; while (currentList != null) { // Try/Finally bllock to avoid thread aport between acquiring the lock and setting the taken flag try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } ////// Local helper method to release all local lists locks /// private void ReleaseAllLocks() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } ////// Local helper function to wait all unsynchronized operations /// private void WaitAllOperations() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } ////// Local helper function to get the bag count, the caller should call it from Freeze/Unfreeze block /// ///The current bag count private int GetCountInternal() { int count = 0; ThreadLocalList currentList = m_headList; while (currentList != null) { checked { count += currentList.Count; } currentList = currentList.m_nextList; } return count; } ////// Local helper function to return the bag item in a list, this is mainly used by CopyTo and ToArray /// This is not thread safe, should be called in Freeze/UnFreeze bag block /// ///List the contains the bag items private ListToList() { List list = new List (); ThreadLocalList currentList = m_headList; while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } #endregion #region Inner Classes /// /// A class that represents a node in the lock thread list /// [Serializable] internal class Node { public Node(T value) { m_value = value; } public T m_value; public Node m_next; public Node m_prev; } ////// A class that represents the lock thread list /// internal class ThreadLocalList { // Tead node in the list, null means the list is empty internal Node m_head; // Tail node for the list private Node m_tail; // The current list operation internal volatile int m_currentOp; // The list count from the Add/Take prespective private int m_count; // The stealing count internal int m_stealCount; // Next list in the dictionary values internal ThreadLocalList m_nextList; // Set if the locl lock is taken internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // the version of the list, incremented only when the list changed from empty to non empty state internal volatile int m_version; ////// ThreadLocalList constructor /// /// The owner thread for this list internal ThreadLocalList(Thread ownerThread) { m_ownerThread = ownerThread; } ////// Add new item to head of the list /// /// The item to add. /// Whether to update the count. internal void Add(T item, bool updateCount) { checked { m_count++; } Node node = new Node(item); if (m_head == null) { Debug.Assert(m_tail == null); m_head = node; m_tail = node; m_version++; // changing from empty state to non empty state } else { node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updateCount) // update the count to avoid overflow if this add is synchronized { m_count = m_count - m_stealCount; m_stealCount = 0; } } ////// Remove an item from the head of the list /// /// The removed item internal void Remove(out T result) { Debug.Assert(m_head != null); Node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } ////// Peek an item from the head of the list /// /// the peeked item ///True if succeeded, false otherwise internal bool Peek(out T result) { Node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(T); return false; } ////// Steal an item from the tail of the list /// /// the removed item /// remove or peek flag internal void Steal(out T result, bool remove) { Node tail = m_tail; Debug.Assert(tail != null); if (remove) // Take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // Increment the steal count m_stealCount++; } result = tail.m_value; } ////// Gets the total list count, it's not thread safe, may provide incorrect count if it is called concurrently /// internal int Count { get { return m_count - m_stealCount; } } } ////// List operations /// internal enum ListOperation { None, Add, Take }; #endregion } #region Internal Types ////// A simple class for the debugger view window /// internal sealed class SystemThreadingCollection_IProducerConsumerCollectionDebugView{ IProducerConsumerCollection m_collection; public SystemThreadingCollection_IProducerConsumerCollectionDebugView(IProducerConsumerCollection collection) { if (collection == null) { throw new ArgumentNullException("collection"); } m_collection = collection; } public T[] Items { get { return m_collection.ToArray(); } } } #endregion } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SEHException.cs
- TdsParserHelperClasses.cs
- InputReferenceExpression.cs
- ConfigXmlCDataSection.cs
- ContainerParaClient.cs
- CqlParser.cs
- SystemBrushes.cs
- ToolboxService.cs
- SiteMapPath.cs
- ApplicationFileParser.cs
- TraceData.cs
- XsltQilFactory.cs
- VideoDrawing.cs
- FontDialog.cs
- SingleKeyFrameCollection.cs
- LinearKeyFrames.cs
- LingerOption.cs
- XPathNavigatorKeyComparer.cs
- ADMembershipUser.cs
- ToolStripContextMenu.cs
- MappingMetadataHelper.cs
- _Events.cs
- TcpStreams.cs
- Char.cs
- WorkflowWebHostingModule.cs
- SortKey.cs
- FixUp.cs
- RegistryPermission.cs
- GeneralTransformCollection.cs
- ListContractAdapter.cs
- MorphHelper.cs
- EntityConnection.cs
- CreateUserWizard.cs
- BooleanAnimationBase.cs
- UIPropertyMetadata.cs
- WebPartCatalogCloseVerb.cs
- TextEditorMouse.cs
- DPTypeDescriptorContext.cs
- GeometryCollection.cs
- WebCategoryAttribute.cs
- CodeDOMProvider.cs
- __ConsoleStream.cs
- CommandManager.cs
- BasePattern.cs
- JoinElimination.cs
- SyncOperationState.cs
- HelpKeywordAttribute.cs
- FormViewPageEventArgs.cs
- Crypto.cs
- DataReceivedEventArgs.cs
- PackageFilter.cs
- ActivityInstanceMap.cs
- Compilation.cs
- ToolZone.cs
- WebConfigurationManager.cs
- TreeNodeCollection.cs
- HelpHtmlBuilder.cs
- Point.cs
- ContentHostHelper.cs
- QilName.cs
- HtmlLink.cs
- InfoCardRSAPKCS1SignatureDeformatter.cs
- LockRecursionException.cs
- Polyline.cs
- KeyedCollection.cs
- CustomUserNameSecurityTokenAuthenticator.cs
- Maps.cs
- ServiceNotStartedException.cs
- LocalizableResourceBuilder.cs
- XmlNodeChangedEventArgs.cs
- AspCompat.cs
- InfoCardRSAPKCS1SignatureFormatter.cs
- ScriptComponentDescriptor.cs
- TraceUtility.cs
- BezierSegment.cs
- RTLAwareMessageBox.cs
- DataQuery.cs
- HtmlInputCheckBox.cs
- XsltLibrary.cs
- Table.cs
- AssemblyInfo.cs
- WebServiceClientProxyGenerator.cs
- ObjectComplexPropertyMapping.cs
- HttpHandlerAction.cs
- ObjectStateFormatter.cs
- xmlformatgeneratorstatics.cs
- EventPrivateKey.cs
- MonthChangedEventArgs.cs
- XmlCharType.cs
- DataSourceGroupCollection.cs
- TabControl.cs
- Annotation.cs
- EdmPropertyAttribute.cs
- AttributeQuery.cs
- CodeGen.cs
- UInt32.cs
- HttpContextWrapper.cs
- DefaultHttpHandler.cs
- OdbcTransaction.cs
- ReflectionTypeLoadException.cs