Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / tx / System / Transactions / TransactionTable.cs / 1305376 / TransactionTable.cs
//------------------------------------------------------------------------------ //// Copyright (c) Microsoft Corporation. All rights reserved. // //----------------------------------------------------------------------------- namespace System.Transactions { using System; using System.Diagnostics; using System.Threading; class CheapUnfairReaderWriterLock { object writerFinishedEvent; int readersIn; int readersOut; bool writerPresent; object syncRoot; // Spin lock params const int MAX_SPIN_COUNT = 100; const int SLEEP_TIME = 500; public CheapUnfairReaderWriterLock() { } object SyncRoot { get { if( this.syncRoot == null ) { Interlocked.CompareExchange( ref this.syncRoot, new object(), null ); } return this.syncRoot; } } bool ReadersPresent { get { return this.readersIn != this.readersOut; } } ManualResetEvent WriterFinishedEvent { get { if( this.writerFinishedEvent == null ) { Interlocked.CompareExchange( ref this.writerFinishedEvent, new ManualResetEvent( true ), null ); } return (ManualResetEvent)this.writerFinishedEvent; } } public int AcquireReaderLock() { int readerIndex = 0; do { if( this.writerPresent ) { WriterFinishedEvent.WaitOne(); } readerIndex = Interlocked.Increment( ref this.readersIn ); if( !this.writerPresent ) { break; } Interlocked.Decrement( ref this.readersIn ); } while( true ); return readerIndex; } public void AcquireWriterLock() { #pragma warning disable 0618 //@ Monitor.Enter(this.SyncRoot); #pragma warning restore 0618 this.writerPresent = true; this.WriterFinishedEvent.Reset(); do { int i = 0; while( ReadersPresent && i < MAX_SPIN_COUNT ) { Thread.Sleep( 0 ); i++; } if( ReadersPresent ) { Thread.Sleep( SLEEP_TIME ); } } while( ReadersPresent ); } public void ReleaseReaderLock() { Interlocked.Increment( ref this.readersOut ); } public void ReleaseWriterLock() { try { this.writerPresent = false; this.WriterFinishedEvent.Set(); } finally { Monitor.Exit( this.SyncRoot ); } } } // This transaction table implementation uses an array of lists to avoid contention. The list for a // transaction is decided by its hashcode. class TransactionTable { // Use a timer to initiate looking for transactions that have timed out. System.Threading.Timer timer; // Private storage noting if the timer is enabled. bool timerEnabled; // Store the timer interval const int timerInternalExponent = 9; int timerInterval; // Store the number of ticks. A tick is a mark of 1 timer interval. By counting ticks // we can avoid expensive calls to get the current time for every transaction creation. const long TicksPerMillisecond = 10000; long ticks; Int64 lastTimerTime; // Sets of arrays of transactions. BucketSet headBucketSet; // Synchronize adding transactions with shutting off the timer and started events. CheapUnfairReaderWriterLock rwLock; internal TransactionTable() { // Create a timer that is initially disabled by specifing an Infinite time to the first interval this.timer = new Timer( new TimerCallback(ThreadTimer), null, Timeout.Infinite, this.timerInterval ); // Note that the timer is disabled this.timerEnabled = false; // Store the timer interval this.timerInterval = 1 << TransactionTable.timerInternalExponent; // Ticks start off at zero. this.ticks = 0; // The head of the list is long.MaxValue. It contains all of the transactions that for // some reason or other don't have a timeout. this.headBucketSet = new BucketSet( this, long.MaxValue ); // Allocate the lock rwLock = new CheapUnfairReaderWriterLock(); } // Calculate the maximum number of ticks for which this transaction should live internal long TimeoutTicks( TimeSpan timeout ) { if( timeout != TimeSpan.Zero ) { // Note: At the current setting of approximately 2 ticks per second this timer will // wrap in approximately 2^64/2/60/60/24/365=292,471,208,677.5360162195585996 // (nearly 300 billion) years. long timeoutTicks = ((timeout.Ticks / TimeSpan.TicksPerMillisecond) >> TransactionTable.timerInternalExponent) + this.ticks; return timeoutTicks; } else { return long.MaxValue; } } // Absolute timeout internal TimeSpan RecalcTimeout( InternalTransaction tx ) { return TimeSpan.FromMilliseconds( (tx.AbsoluteTimeout - this.ticks) * this.timerInterval ); } // Creation time private Int64 CurrentTime { get { if( this.timerEnabled ) { return this.lastTimerTime; } else { return DateTime.UtcNow.Ticks; } } } // Add a transaction to the table. Transactions are added to the end of the list in sorted order based on their // absolute timeout. internal int Add( InternalTransaction txNew ) { // Tell the runtime that we are modifying global state. Thread.BeginCriticalRegion(); int readerIndex = 0; try { readerIndex = rwLock.AcquireReaderLock(); try { // Start the timer if needed before checking the current time since the current // time can be more efficient with a running timer. if( txNew.AbsoluteTimeout != long.MaxValue ) { if( !this.timerEnabled ) { if( !this.timer.Change( this.timerInterval, this.timerInterval )) { throw TransactionException.CreateInvalidOperationException( SR.GetString( SR.TraceSourceLtm ), SR.GetString(SR.UnexpectedTimerFailure), null ); } this.lastTimerTime = DateTime.UtcNow.Ticks; this.timerEnabled = true; } } txNew.CreationTime = CurrentTime; AddIter( txNew ); } finally { rwLock.ReleaseReaderLock(); } } finally { Thread.EndCriticalRegion(); } return readerIndex; } void AddIter( InternalTransaction txNew ) { // // Theory of operation. // // Note that the head bucket contains any transaction with essentially infinite // timeout (long.MaxValue). The list is sorted in decending order. To add // a node the code must walk down the list looking for a set of bucket that matches // the absolute timeout value for the transaction. When it is found it passes // the insert down to that set. // // An importent thing to note about the list is that forward links are all weak // references and reverse links are all strong references. This allows the GC // to clean up old links in the list so that they don't need to be removed manually. // However if there is still a rooted strong reference to an old link in the // chain that link wont fall off the list because there is a strong reference held // forward. // BucketSet currentBucketSet = this.headBucketSet; while( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) { BucketSet lastBucketSet = null; do { WeakReference nextSetWeak = (WeakReference)currentBucketSet.nextSetWeak; BucketSet nextBucketSet = null; if( nextSetWeak != null ) { nextBucketSet = (BucketSet)nextSetWeak.Target; } if( nextBucketSet == null ) { // // We've reached the end of the list either because nextSetWeak was null or // because its reference was collected. This code doesn't care. Make a new // set, attempt to attach it and move on. // BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); WeakReference newSetWeak = new WeakReference( newBucketSet ); WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( ref currentBucketSet.nextSetWeak, newSetWeak, nextSetWeak ); if( oldNextSetWeak == nextSetWeak ) { // Ladies and Gentlemen we have a winner. newBucketSet.prevSet = currentBucketSet; } // Note that at this point we don't update currentBucketSet. On the next loop // iteration we should be able to pick up where we left off. } else { lastBucketSet = currentBucketSet; currentBucketSet = nextBucketSet; } }while( currentBucketSet.AbsoluteTimeout > txNew.AbsoluteTimeout ); if( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) { // // Getting to here means that we've found a slot in the list where this bucket set should go. // BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); WeakReference newSetWeak = new WeakReference( newBucketSet ); newBucketSet.nextSetWeak = lastBucketSet.nextSetWeak; WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, newSetWeak, newBucketSet.nextSetWeak ); if( oldNextSetWeak == newBucketSet.nextSetWeak ) { // Ladies and Gentlemen we have a winner. if( oldNextSetWeak != null ) { BucketSet oldSet = (BucketSet)oldNextSetWeak.Target; if( oldSet != null ) { // prev references are just there to root things for the GC. If this object is // gone we don't really care. oldSet.prevSet = newBucketSet; } } newBucketSet.prevSet = currentBucketSet; } currentBucketSet = lastBucketSet; lastBucketSet = null; // The outer loop will iterate and pick up where we left off. } } // // Great we found a spot. // currentBucketSet.Add( txNew ); } // Remove a transaction from the table. internal void Remove( InternalTransaction tx ) { tx.tableBucket.Remove( tx ); tx.tableBucket = null; } // Process a timer event private void ThreadTimer( Object state ) { // // Theory of operation. // // To timeout transactions we must walk down the list starting from the head // until we find a link with an absolute timeout that is greater than our own. // At that point everything further down in the list is elegable to be timed // out. So simply remove that link in the list and walk down from that point // timing out any transaction that is found. // // There could be a ---- between this callback being queued and the timer // being disabled. If we get here when the timer is disabled, just return. if ( !this.timerEnabled ) { return; } // Increment the number of ticks this.ticks++; this.lastTimerTime = DateTime.UtcNow.Ticks; // // First find the starting point of transactions that should time out. Every transaction after // that point will timeout so once we've found it then it is just a matter of traversing the // structure. // BucketSet lastBucketSet = null; BucketSet currentBucketSet = this.headBucketSet; // The list always has a head. WeakReference nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; BucketSet nextBucketSet = null; if( nextWeakSet != null ) { nextBucketSet = (BucketSet)nextWeakSet.Target; } if( nextBucketSet == null ) { // // Special case to allow for disabling the timer. // this.rwLock.AcquireWriterLock(); try { // If there are no transactions on the timeout list we can disable the // timer. if( !this.timer.Change( Timeout.Infinite, Timeout.Infinite )) { throw TransactionException.CreateInvalidOperationException( SR.GetString( SR.TraceSourceLtm ), SR.GetString(SR.UnexpectedTimerFailure), null ); } this.timerEnabled = false; return; } finally { this.rwLock.ReleaseWriterLock(); } } // Note it is slightly subtle that we always skip the head node. This is done // on purpose because the head node contains transactions with essentially // an infinite timeout. do { do { nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; if( nextWeakSet == null ) { // Nothing more to do. return; } nextBucketSet = (BucketSet)nextWeakSet.Target; if( nextBucketSet == null ) { // Again nothing more to do. return; } lastBucketSet = currentBucketSet; currentBucketSet = nextBucketSet; } while( currentBucketSet.AbsoluteTimeout > this.ticks ); // Tell the runtime that we are modifying global state. Thread.BeginCriticalRegion(); try { // // Pinch off the list at this point making sure it is still the correct set. // WeakReference abortingSetsWeak = (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, null, nextWeakSet ); if( abortingSetsWeak == nextWeakSet ) { // Yea - now proceed to abort the transactions. BucketSet abortingBucketSets = null; do { if( abortingSetsWeak != null ) { abortingBucketSets = (BucketSet)abortingSetsWeak.Target; } else { abortingBucketSets = null; } if( abortingBucketSets != null ) { abortingBucketSets.TimeoutTransactions(); abortingSetsWeak = (WeakReference)abortingBucketSets.nextSetWeak; } } while( abortingBucketSets != null ); // That's all we needed to do. break; } } finally { Thread.EndCriticalRegion(); } // We missed pulling the right transactions off. Loop back up and try again. currentBucketSet = lastBucketSet; }while( true ); } } class BucketSet { // Buckets are kept in sets. Each element of a set will have the same absoluteTimeout. internal object nextSetWeak; internal BucketSet prevSet; TransactionTable table; long absoluteTimeout; internal Bucket headBucket; internal BucketSet( TransactionTable table, long absoluteTimeout ) { this.headBucket = new Bucket( this ); this.table = table; this.absoluteTimeout = absoluteTimeout; } internal long AbsoluteTimeout { get { return this.absoluteTimeout; } } internal void Add( InternalTransaction newTx ) { while( !this.headBucket.Add( newTx )) ; } internal void TimeoutTransactions() { Bucket currentBucket = this.headBucket; // It will always have a head. do { currentBucket.TimeoutTransactions(); WeakReference nextWeakBucket = (WeakReference)currentBucket.nextBucketWeak; if( nextWeakBucket != null ) { currentBucket = (Bucket)nextWeakBucket.Target; } else { currentBucket = null; } } while( currentBucket != null ); } } class Bucket { bool timedOut; int index; int size; InternalTransaction [] transactions; internal WeakReference nextBucketWeak; Bucket previous; BucketSet owningSet; internal Bucket( BucketSet owningSet ) { this.timedOut = false; this.index = -1; this.size = 1024; // A possible design change here is to have this scale dynamically based on load. transactions = new InternalTransaction [this.size]; this.owningSet = owningSet; } internal bool Add( InternalTransaction tx ) { int currentIndex = Interlocked.Increment( ref this.index ); if( currentIndex < this.size ) { tx.tableBucket = this; tx.bucketIndex = currentIndex; Thread.MemoryBarrier(); // This data must be written before the transaction // could be timed out. this.transactions [currentIndex] = tx; if( this.timedOut ) { lock( tx ) { tx.State.Timeout( tx ); } } } else { Bucket newBucket = new Bucket( this.owningSet ); newBucket.nextBucketWeak = new WeakReference( this ); Bucket oldBucket = Interlocked.CompareExchange( ref this.owningSet.headBucket, newBucket, this ); if( oldBucket == this ) { // ladies and gentlemen we have a winner. this.previous = newBucket; } return false; } return true; } internal void Remove( InternalTransaction tx ) { this.transactions [tx.bucketIndex] = null; } internal void TimeoutTransactions() { int i; int transactionCount = this.index; this.timedOut = true; Thread.MemoryBarrier(); for( i = 0; i <= transactionCount && i < this.size; i++ ) { Debug.Assert( transactionCount == this.index, "Index changed timing out transactions" ); InternalTransaction tx = this.transactions [i]; if( tx != null ) { lock( tx ) { tx.State.Timeout( tx ); } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. //------------------------------------------------------------------------------ //// Copyright (c) Microsoft Corporation. All rights reserved. // //----------------------------------------------------------------------------- namespace System.Transactions { using System; using System.Diagnostics; using System.Threading; class CheapUnfairReaderWriterLock { object writerFinishedEvent; int readersIn; int readersOut; bool writerPresent; object syncRoot; // Spin lock params const int MAX_SPIN_COUNT = 100; const int SLEEP_TIME = 500; public CheapUnfairReaderWriterLock() { } object SyncRoot { get { if( this.syncRoot == null ) { Interlocked.CompareExchange( ref this.syncRoot, new object(), null ); } return this.syncRoot; } } bool ReadersPresent { get { return this.readersIn != this.readersOut; } } ManualResetEvent WriterFinishedEvent { get { if( this.writerFinishedEvent == null ) { Interlocked.CompareExchange( ref this.writerFinishedEvent, new ManualResetEvent( true ), null ); } return (ManualResetEvent)this.writerFinishedEvent; } } public int AcquireReaderLock() { int readerIndex = 0; do { if( this.writerPresent ) { WriterFinishedEvent.WaitOne(); } readerIndex = Interlocked.Increment( ref this.readersIn ); if( !this.writerPresent ) { break; } Interlocked.Decrement( ref this.readersIn ); } while( true ); return readerIndex; } public void AcquireWriterLock() { #pragma warning disable 0618 //@ Monitor.Enter(this.SyncRoot); #pragma warning restore 0618 this.writerPresent = true; this.WriterFinishedEvent.Reset(); do { int i = 0; while( ReadersPresent && i < MAX_SPIN_COUNT ) { Thread.Sleep( 0 ); i++; } if( ReadersPresent ) { Thread.Sleep( SLEEP_TIME ); } } while( ReadersPresent ); } public void ReleaseReaderLock() { Interlocked.Increment( ref this.readersOut ); } public void ReleaseWriterLock() { try { this.writerPresent = false; this.WriterFinishedEvent.Set(); } finally { Monitor.Exit( this.SyncRoot ); } } } // This transaction table implementation uses an array of lists to avoid contention. The list for a // transaction is decided by its hashcode. class TransactionTable { // Use a timer to initiate looking for transactions that have timed out. System.Threading.Timer timer; // Private storage noting if the timer is enabled. bool timerEnabled; // Store the timer interval const int timerInternalExponent = 9; int timerInterval; // Store the number of ticks. A tick is a mark of 1 timer interval. By counting ticks // we can avoid expensive calls to get the current time for every transaction creation. const long TicksPerMillisecond = 10000; long ticks; Int64 lastTimerTime; // Sets of arrays of transactions. BucketSet headBucketSet; // Synchronize adding transactions with shutting off the timer and started events. CheapUnfairReaderWriterLock rwLock; internal TransactionTable() { // Create a timer that is initially disabled by specifing an Infinite time to the first interval this.timer = new Timer( new TimerCallback(ThreadTimer), null, Timeout.Infinite, this.timerInterval ); // Note that the timer is disabled this.timerEnabled = false; // Store the timer interval this.timerInterval = 1 << TransactionTable.timerInternalExponent; // Ticks start off at zero. this.ticks = 0; // The head of the list is long.MaxValue. It contains all of the transactions that for // some reason or other don't have a timeout. this.headBucketSet = new BucketSet( this, long.MaxValue ); // Allocate the lock rwLock = new CheapUnfairReaderWriterLock(); } // Calculate the maximum number of ticks for which this transaction should live internal long TimeoutTicks( TimeSpan timeout ) { if( timeout != TimeSpan.Zero ) { // Note: At the current setting of approximately 2 ticks per second this timer will // wrap in approximately 2^64/2/60/60/24/365=292,471,208,677.5360162195585996 // (nearly 300 billion) years. long timeoutTicks = ((timeout.Ticks / TimeSpan.TicksPerMillisecond) >> TransactionTable.timerInternalExponent) + this.ticks; return timeoutTicks; } else { return long.MaxValue; } } // Absolute timeout internal TimeSpan RecalcTimeout( InternalTransaction tx ) { return TimeSpan.FromMilliseconds( (tx.AbsoluteTimeout - this.ticks) * this.timerInterval ); } // Creation time private Int64 CurrentTime { get { if( this.timerEnabled ) { return this.lastTimerTime; } else { return DateTime.UtcNow.Ticks; } } } // Add a transaction to the table. Transactions are added to the end of the list in sorted order based on their // absolute timeout. internal int Add( InternalTransaction txNew ) { // Tell the runtime that we are modifying global state. Thread.BeginCriticalRegion(); int readerIndex = 0; try { readerIndex = rwLock.AcquireReaderLock(); try { // Start the timer if needed before checking the current time since the current // time can be more efficient with a running timer. if( txNew.AbsoluteTimeout != long.MaxValue ) { if( !this.timerEnabled ) { if( !this.timer.Change( this.timerInterval, this.timerInterval )) { throw TransactionException.CreateInvalidOperationException( SR.GetString( SR.TraceSourceLtm ), SR.GetString(SR.UnexpectedTimerFailure), null ); } this.lastTimerTime = DateTime.UtcNow.Ticks; this.timerEnabled = true; } } txNew.CreationTime = CurrentTime; AddIter( txNew ); } finally { rwLock.ReleaseReaderLock(); } } finally { Thread.EndCriticalRegion(); } return readerIndex; } void AddIter( InternalTransaction txNew ) { // // Theory of operation. // // Note that the head bucket contains any transaction with essentially infinite // timeout (long.MaxValue). The list is sorted in decending order. To add // a node the code must walk down the list looking for a set of bucket that matches // the absolute timeout value for the transaction. When it is found it passes // the insert down to that set. // // An importent thing to note about the list is that forward links are all weak // references and reverse links are all strong references. This allows the GC // to clean up old links in the list so that they don't need to be removed manually. // However if there is still a rooted strong reference to an old link in the // chain that link wont fall off the list because there is a strong reference held // forward. // BucketSet currentBucketSet = this.headBucketSet; while( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) { BucketSet lastBucketSet = null; do { WeakReference nextSetWeak = (WeakReference)currentBucketSet.nextSetWeak; BucketSet nextBucketSet = null; if( nextSetWeak != null ) { nextBucketSet = (BucketSet)nextSetWeak.Target; } if( nextBucketSet == null ) { // // We've reached the end of the list either because nextSetWeak was null or // because its reference was collected. This code doesn't care. Make a new // set, attempt to attach it and move on. // BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); WeakReference newSetWeak = new WeakReference( newBucketSet ); WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( ref currentBucketSet.nextSetWeak, newSetWeak, nextSetWeak ); if( oldNextSetWeak == nextSetWeak ) { // Ladies and Gentlemen we have a winner. newBucketSet.prevSet = currentBucketSet; } // Note that at this point we don't update currentBucketSet. On the next loop // iteration we should be able to pick up where we left off. } else { lastBucketSet = currentBucketSet; currentBucketSet = nextBucketSet; } }while( currentBucketSet.AbsoluteTimeout > txNew.AbsoluteTimeout ); if( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) { // // Getting to here means that we've found a slot in the list where this bucket set should go. // BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); WeakReference newSetWeak = new WeakReference( newBucketSet ); newBucketSet.nextSetWeak = lastBucketSet.nextSetWeak; WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, newSetWeak, newBucketSet.nextSetWeak ); if( oldNextSetWeak == newBucketSet.nextSetWeak ) { // Ladies and Gentlemen we have a winner. if( oldNextSetWeak != null ) { BucketSet oldSet = (BucketSet)oldNextSetWeak.Target; if( oldSet != null ) { // prev references are just there to root things for the GC. If this object is // gone we don't really care. oldSet.prevSet = newBucketSet; } } newBucketSet.prevSet = currentBucketSet; } currentBucketSet = lastBucketSet; lastBucketSet = null; // The outer loop will iterate and pick up where we left off. } } // // Great we found a spot. // currentBucketSet.Add( txNew ); } // Remove a transaction from the table. internal void Remove( InternalTransaction tx ) { tx.tableBucket.Remove( tx ); tx.tableBucket = null; } // Process a timer event private void ThreadTimer( Object state ) { // // Theory of operation. // // To timeout transactions we must walk down the list starting from the head // until we find a link with an absolute timeout that is greater than our own. // At that point everything further down in the list is elegable to be timed // out. So simply remove that link in the list and walk down from that point // timing out any transaction that is found. // // There could be a ---- between this callback being queued and the timer // being disabled. If we get here when the timer is disabled, just return. if ( !this.timerEnabled ) { return; } // Increment the number of ticks this.ticks++; this.lastTimerTime = DateTime.UtcNow.Ticks; // // First find the starting point of transactions that should time out. Every transaction after // that point will timeout so once we've found it then it is just a matter of traversing the // structure. // BucketSet lastBucketSet = null; BucketSet currentBucketSet = this.headBucketSet; // The list always has a head. WeakReference nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; BucketSet nextBucketSet = null; if( nextWeakSet != null ) { nextBucketSet = (BucketSet)nextWeakSet.Target; } if( nextBucketSet == null ) { // // Special case to allow for disabling the timer. // this.rwLock.AcquireWriterLock(); try { // If there are no transactions on the timeout list we can disable the // timer. if( !this.timer.Change( Timeout.Infinite, Timeout.Infinite )) { throw TransactionException.CreateInvalidOperationException( SR.GetString( SR.TraceSourceLtm ), SR.GetString(SR.UnexpectedTimerFailure), null ); } this.timerEnabled = false; return; } finally { this.rwLock.ReleaseWriterLock(); } } // Note it is slightly subtle that we always skip the head node. This is done // on purpose because the head node contains transactions with essentially // an infinite timeout. do { do { nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; if( nextWeakSet == null ) { // Nothing more to do. return; } nextBucketSet = (BucketSet)nextWeakSet.Target; if( nextBucketSet == null ) { // Again nothing more to do. return; } lastBucketSet = currentBucketSet; currentBucketSet = nextBucketSet; } while( currentBucketSet.AbsoluteTimeout > this.ticks ); // Tell the runtime that we are modifying global state. Thread.BeginCriticalRegion(); try { // // Pinch off the list at this point making sure it is still the correct set. // WeakReference abortingSetsWeak = (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, null, nextWeakSet ); if( abortingSetsWeak == nextWeakSet ) { // Yea - now proceed to abort the transactions. BucketSet abortingBucketSets = null; do { if( abortingSetsWeak != null ) { abortingBucketSets = (BucketSet)abortingSetsWeak.Target; } else { abortingBucketSets = null; } if( abortingBucketSets != null ) { abortingBucketSets.TimeoutTransactions(); abortingSetsWeak = (WeakReference)abortingBucketSets.nextSetWeak; } } while( abortingBucketSets != null ); // That's all we needed to do. break; } } finally { Thread.EndCriticalRegion(); } // We missed pulling the right transactions off. Loop back up and try again. currentBucketSet = lastBucketSet; }while( true ); } } class BucketSet { // Buckets are kept in sets. Each element of a set will have the same absoluteTimeout. internal object nextSetWeak; internal BucketSet prevSet; TransactionTable table; long absoluteTimeout; internal Bucket headBucket; internal BucketSet( TransactionTable table, long absoluteTimeout ) { this.headBucket = new Bucket( this ); this.table = table; this.absoluteTimeout = absoluteTimeout; } internal long AbsoluteTimeout { get { return this.absoluteTimeout; } } internal void Add( InternalTransaction newTx ) { while( !this.headBucket.Add( newTx )) ; } internal void TimeoutTransactions() { Bucket currentBucket = this.headBucket; // It will always have a head. do { currentBucket.TimeoutTransactions(); WeakReference nextWeakBucket = (WeakReference)currentBucket.nextBucketWeak; if( nextWeakBucket != null ) { currentBucket = (Bucket)nextWeakBucket.Target; } else { currentBucket = null; } } while( currentBucket != null ); } } class Bucket { bool timedOut; int index; int size; InternalTransaction [] transactions; internal WeakReference nextBucketWeak; Bucket previous; BucketSet owningSet; internal Bucket( BucketSet owningSet ) { this.timedOut = false; this.index = -1; this.size = 1024; // A possible design change here is to have this scale dynamically based on load. transactions = new InternalTransaction [this.size]; this.owningSet = owningSet; } internal bool Add( InternalTransaction tx ) { int currentIndex = Interlocked.Increment( ref this.index ); if( currentIndex < this.size ) { tx.tableBucket = this; tx.bucketIndex = currentIndex; Thread.MemoryBarrier(); // This data must be written before the transaction // could be timed out. this.transactions [currentIndex] = tx; if( this.timedOut ) { lock( tx ) { tx.State.Timeout( tx ); } } } else { Bucket newBucket = new Bucket( this.owningSet ); newBucket.nextBucketWeak = new WeakReference( this ); Bucket oldBucket = Interlocked.CompareExchange( ref this.owningSet.headBucket, newBucket, this ); if( oldBucket == this ) { // ladies and gentlemen we have a winner. this.previous = newBucket; } return false; } return true; } internal void Remove( InternalTransaction tx ) { this.transactions [tx.bucketIndex] = null; } internal void TimeoutTransactions() { int i; int transactionCount = this.index; this.timedOut = true; Thread.MemoryBarrier(); for( i = 0; i <= transactionCount && i < this.size; i++ ) { Debug.Assert( transactionCount == this.index, "Index changed timing out transactions" ); InternalTransaction tx = this.transactions [i]; if( tx != null ) { lock( tx ) { tx.State.Timeout( tx ); } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SystemEvents.cs
- DataRecordInternal.cs
- TemplateInstanceAttribute.cs
- WeakReference.cs
- Int16AnimationBase.cs
- EventMappingSettings.cs
- EventLog.cs
- DocumentGrid.cs
- TextContainerChangeEventArgs.cs
- XslException.cs
- IdentityValidationException.cs
- XPathScanner.cs
- SecurityPermission.cs
- FrameworkObject.cs
- ExtendedTransformFactory.cs
- XamlReaderHelper.cs
- FontWeightConverter.cs
- XmlILAnnotation.cs
- StateMachineAction.cs
- PaintEvent.cs
- KeyedHashAlgorithm.cs
- WindowsGraphics2.cs
- Focus.cs
- TrackingServices.cs
- LayoutTable.cs
- Lasso.cs
- Command.cs
- DataTablePropertyDescriptor.cs
- ObjectSecurity.cs
- SqlPersonalizationProvider.cs
- XmlCodeExporter.cs
- MembershipValidatePasswordEventArgs.cs
- StringOutput.cs
- SiteMapNodeItem.cs
- MemoryFailPoint.cs
- BuildResult.cs
- HeaderedContentControl.cs
- XmlSerializationWriter.cs
- ContentHostHelper.cs
- XmlSerializationWriter.cs
- XpsS0ValidatingLoader.cs
- AnnotationResourceCollection.cs
- WebServiceClientProxyGenerator.cs
- FontFamily.cs
- CatalogPartChrome.cs
- XmlSchemaGroupRef.cs
- BaseHashHelper.cs
- QilLoop.cs
- HostExecutionContextManager.cs
- TypeViewSchema.cs
- EntityDataSourceContextCreatingEventArgs.cs
- EncoderParameter.cs
- NumericUpDown.cs
- TextRunCache.cs
- HtmlShimManager.cs
- BufferModesCollection.cs
- DataServiceResponse.cs
- EventToken.cs
- SessionSwitchEventArgs.cs
- XmlTextReaderImplHelpers.cs
- ActivityExecutor.cs
- BrowserCapabilitiesCodeGenerator.cs
- Rules.cs
- Context.cs
- SamlAssertionKeyIdentifierClause.cs
- ClientUrlResolverWrapper.cs
- ZoneMembershipCondition.cs
- Figure.cs
- DataKeyCollection.cs
- DSACryptoServiceProvider.cs
- CodeVariableDeclarationStatement.cs
- AnnotationComponentManager.cs
- RsaSecurityKey.cs
- baseaxisquery.cs
- ByteFacetDescriptionElement.cs
- HttpHeaderCollection.cs
- BamlVersionHeader.cs
- AudioDeviceOut.cs
- IDictionary.cs
- PeerChannelFactory.cs
- FacetValueContainer.cs
- _CacheStreams.cs
- ResourceDisplayNameAttribute.cs
- RadioButtonRenderer.cs
- CachedPathData.cs
- DiffuseMaterial.cs
- AssemblyInfo.cs
- RSAProtectedConfigurationProvider.cs
- AdCreatedEventArgs.cs
- ComponentCommands.cs
- DocumentViewerAutomationPeer.cs
- serverconfig.cs
- RelatedView.cs
- AccessDataSourceWizardForm.cs
- RangeValuePatternIdentifiers.cs
- PenContexts.cs
- UrlUtility.cs
- ListSourceHelper.cs
- FillBehavior.cs
- SharedDp.cs