TransactionTable.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / tx / System / Transactions / TransactionTable.cs / 1305376 / TransactionTable.cs

                            //------------------------------------------------------------------------------ 
// 
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// 
//----------------------------------------------------------------------------- 

namespace System.Transactions 
{ 
    using System;
    using System.Diagnostics; 
    using System.Threading;


    class CheapUnfairReaderWriterLock 
    {
        object writerFinishedEvent; 
 
        int readersIn;
        int readersOut; 
        bool writerPresent;

        object syncRoot;
 
        // Spin lock params
        const int MAX_SPIN_COUNT = 100; 
        const int SLEEP_TIME = 500; 

 
        public CheapUnfairReaderWriterLock()
        {
        }
 

        object SyncRoot 
        { 
            get
            { 
                if( this.syncRoot == null )
                {
                    Interlocked.CompareExchange( ref this.syncRoot, new object(), null );
                } 
                return this.syncRoot;
            } 
        } 

 
        bool ReadersPresent
        {
            get
            { 
                return this.readersIn != this.readersOut;
            } 
        } 

 
        ManualResetEvent WriterFinishedEvent
        {
            get
            { 
                if( this.writerFinishedEvent == null )
                { 
                    Interlocked.CompareExchange( ref this.writerFinishedEvent, new ManualResetEvent( true ), null ); 
                }
                return (ManualResetEvent)this.writerFinishedEvent; 
            }
        }

 
        public int AcquireReaderLock()
        { 
            int readerIndex = 0; 
            do
            { 
                if( this.writerPresent )
                {
                    WriterFinishedEvent.WaitOne();
                } 

                readerIndex = Interlocked.Increment( ref this.readersIn ); 
 
                if( !this.writerPresent )
                { 
                    break;
                }

                Interlocked.Decrement( ref this.readersIn ); 
            } while( true );
 
            return readerIndex; 
        }
 

        public void AcquireWriterLock()
        {
#pragma warning disable 0618 
            //@
            Monitor.Enter(this.SyncRoot); 
#pragma warning restore 0618 

            this.writerPresent = true; 
            this.WriterFinishedEvent.Reset();

            do
            { 
                int i = 0;
                while( ReadersPresent && i < MAX_SPIN_COUNT ) 
                { 
                    Thread.Sleep( 0 );
                    i++; 
                }

                if( ReadersPresent )
                { 
                    Thread.Sleep( SLEEP_TIME );
                } 
            } while( ReadersPresent ); 
        }
 

        public void ReleaseReaderLock()
        {
            Interlocked.Increment( ref this.readersOut ); 
        }
 
 
        public void ReleaseWriterLock()
        { 
            try
            {
                this.writerPresent = false;
                this.WriterFinishedEvent.Set(); 
            }
            finally 
            { 
                Monitor.Exit( this.SyncRoot );
            } 
        }
    }

 
    // This transaction table implementation uses an array of lists to avoid contention.  The list for a
    // transaction is decided by its hashcode. 
    class TransactionTable 
    {
        // Use a timer to initiate looking for transactions that have timed out. 
        System.Threading.Timer timer;

        // Private storage noting if the timer is enabled.
        bool timerEnabled; 

        // Store the timer interval 
        const int timerInternalExponent = 9; 
        int timerInterval;
 
        // Store the number of ticks.  A tick is a mark of 1 timer interval.  By counting ticks
        // we can avoid expensive calls to get the current time for every transaction creation.
        const long TicksPerMillisecond = 10000;
        long ticks; 
        Int64 lastTimerTime;
 
        // Sets of arrays of transactions. 
        BucketSet headBucketSet;
 
        // Synchronize adding transactions with shutting off the timer and started events.
        CheapUnfairReaderWriterLock rwLock;

        internal TransactionTable() 
        {
            // Create a timer that is initially disabled by specifing an Infinite time to the first interval 
            this.timer = new Timer( new TimerCallback(ThreadTimer), null, Timeout.Infinite, this.timerInterval ); 

            // Note that the timer is disabled 
            this.timerEnabled = false;

            // Store the timer interval
            this.timerInterval = 1 << TransactionTable.timerInternalExponent; 

            // Ticks start off at zero. 
            this.ticks = 0; 

            // The head of the list is long.MaxValue.  It contains all of the transactions that for 
            // some reason or other don't have a timeout.
            this.headBucketSet = new BucketSet( this, long.MaxValue );

            // Allocate the lock 
            rwLock = new CheapUnfairReaderWriterLock();
        } 
 

        // Calculate the maximum number of ticks for which this transaction should live 
        internal long TimeoutTicks( TimeSpan timeout )
        {
            if( timeout != TimeSpan.Zero )
            { 
                // Note: At the current setting of approximately 2 ticks per second this timer will
                //       wrap in approximately 2^64/2/60/60/24/365=292,471,208,677.5360162195585996 
                //       (nearly 300 billion) years. 
                long timeoutTicks = ((timeout.Ticks / TimeSpan.TicksPerMillisecond) >>
                        TransactionTable.timerInternalExponent) + this.ticks; 
                return timeoutTicks;
            }
            else
            { 
                return long.MaxValue;
            } 
        } 

 
        // Absolute timeout
        internal TimeSpan RecalcTimeout( InternalTransaction tx )
        {
            return TimeSpan.FromMilliseconds( (tx.AbsoluteTimeout - this.ticks) * this.timerInterval ); 
        }
 
 
        // Creation time
        private Int64 CurrentTime 
        {
            get
            {
                if( this.timerEnabled ) 
                {
                    return this.lastTimerTime; 
                } 
                else
                { 
                    return DateTime.UtcNow.Ticks;
                }
            }
        } 

 
        // Add a transaction to the table.  Transactions are added to the end of the list in sorted order based on their 
        // absolute timeout.
        internal int Add( InternalTransaction txNew ) 
        {
            // Tell the runtime that we are modifying global state.
            Thread.BeginCriticalRegion();
            int readerIndex = 0; 

            try 
            { 
                readerIndex = rwLock.AcquireReaderLock();
                try 
                {
                    // Start the timer if needed before checking the current time since the current
                    // time can be more efficient with a running timer.
                    if( txNew.AbsoluteTimeout != long.MaxValue ) 
                    {
                        if( !this.timerEnabled ) 
                        { 
                            if( !this.timer.Change( this.timerInterval, this.timerInterval ))
                            { 
                                throw TransactionException.CreateInvalidOperationException(
                                    SR.GetString( SR.TraceSourceLtm ),
                                    SR.GetString(SR.UnexpectedTimerFailure),
                                    null 
                                    );
                            } 
                            this.lastTimerTime = DateTime.UtcNow.Ticks; 
                            this.timerEnabled = true;
                        } 
                    }
                    txNew.CreationTime = CurrentTime;

                    AddIter( txNew ); 
                }
                finally 
                { 
                    rwLock.ReleaseReaderLock();
                } 
            }
            finally
            {
                Thread.EndCriticalRegion(); 
            }
 
            return readerIndex; 
        }
 

        void AddIter( InternalTransaction txNew )
        {
            // 
            // Theory of operation.
            // 
            // Note that the head bucket contains any transaction with essentially infinite 
            // timeout (long.MaxValue).  The list is sorted in decending order.  To add
            // a node the code must walk down the list looking for a set of bucket that matches 
            // the absolute timeout value for the transaction.  When it is found it passes
            // the insert down to that set.
            //
            // An importent thing to note about the list is that forward links are all weak 
            // references and reverse links are all strong references.  This allows the GC
            // to clean up old links in the list so that they don't need to be removed manually. 
            // However if there is still a rooted strong reference to an old link in the 
            // chain that link wont fall off the list because there is a strong reference held
            // forward. 
            //

            BucketSet currentBucketSet = this.headBucketSet;
 
            while( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout )
            { 
                BucketSet lastBucketSet = null; 
                do
                { 
                    WeakReference nextSetWeak = (WeakReference)currentBucketSet.nextSetWeak;
                    BucketSet nextBucketSet = null;
                    if( nextSetWeak != null )
                    { 
                        nextBucketSet = (BucketSet)nextSetWeak.Target;
                    } 
 
                    if( nextBucketSet == null )
                    { 
                        //
                        // We've reached the end of the list either because nextSetWeak was null or
                        // because its reference was collected.  This code doesn't care.  Make a new
                        // set, attempt to attach it and move on. 
                        //
                        BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); 
                        WeakReference newSetWeak = new WeakReference( newBucketSet ); 

                        WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                            ref currentBucketSet.nextSetWeak, newSetWeak, nextSetWeak );
                        if( oldNextSetWeak == nextSetWeak )
                        {
                            // Ladies and Gentlemen we have a winner. 
                            newBucketSet.prevSet = currentBucketSet;
                        } 
 
                        // Note that at this point we don't update currentBucketSet.  On the next loop
                        // iteration we should be able to pick up where we left off. 
                    }
                    else
                    {
                        lastBucketSet = currentBucketSet; 
                        currentBucketSet = nextBucketSet;
                    } 
                }while( currentBucketSet.AbsoluteTimeout > txNew.AbsoluteTimeout ); 

                if( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) 
                {
                    //
                    // Getting to here means that we've found a slot in the list where this bucket set should go.
                    // 
                    BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout );
                    WeakReference newSetWeak = new WeakReference( newBucketSet ); 
 
                    newBucketSet.nextSetWeak = lastBucketSet.nextSetWeak;
                    WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                        ref lastBucketSet.nextSetWeak, newSetWeak, newBucketSet.nextSetWeak );
                    if( oldNextSetWeak == newBucketSet.nextSetWeak )
                    {
                        // Ladies and Gentlemen we have a winner. 
                        if( oldNextSetWeak != null )
                        { 
                            BucketSet oldSet = (BucketSet)oldNextSetWeak.Target; 
                            if( oldSet != null )
                            { 
                                // prev references are just there to root things for the GC.  If this object is
                                // gone we don't really care.
                                oldSet.prevSet = newBucketSet;
                            } 
                        }
                        newBucketSet.prevSet = currentBucketSet; 
                    } 

                    currentBucketSet = lastBucketSet; 
                    lastBucketSet = null;

                    // The outer loop will iterate and pick up where we left off.
                } 
            }
 
            // 
            // Great we found a spot.
            // 
            currentBucketSet.Add( txNew );
        }

 
        // Remove a transaction from the table.
        internal void Remove( InternalTransaction tx ) 
        { 
            tx.tableBucket.Remove( tx );
            tx.tableBucket = null; 
        }


        // Process a timer event 
        private void ThreadTimer( Object state )
        { 
            // 
            // Theory of operation.
            // 
            // To timeout transactions we must walk down the list starting from the head
            // until we find a link with an absolute timeout that is greater than our own.
            // At that point everything further down in the list is elegable to be timed
            // out.  So simply remove that link in the list and walk down from that point 
            // timing out any transaction that is found.
            // 
 
            // There could be a ---- between this callback being queued and the timer
            // being disabled.  If we get here when the timer is disabled, just return. 
            if ( !this.timerEnabled )
            {
                return;
            } 

            // Increment the number of ticks 
            this.ticks++; 
            this.lastTimerTime = DateTime.UtcNow.Ticks;
 
            //
            // First find the starting point of transactions that should time out.  Every transaction after
            // that point will timeout so once we've found it then it is just a matter of traversing the
            // structure. 
            //
            BucketSet lastBucketSet = null; 
            BucketSet currentBucketSet = this.headBucketSet; // The list always has a head. 

            WeakReference nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; 
            BucketSet nextBucketSet = null;
            if( nextWeakSet != null )
            {
                nextBucketSet = (BucketSet)nextWeakSet.Target; 
            }
            if( nextBucketSet == null ) 
            { 
                //
                // Special case to allow for disabling the timer. 
                //
                this.rwLock.AcquireWriterLock();
                try
                { 
                    // If there are no transactions on the timeout list we can disable the
                    // timer. 
                    if( !this.timer.Change( Timeout.Infinite, Timeout.Infinite )) 
                    {
                        throw TransactionException.CreateInvalidOperationException( 
                            SR.GetString( SR.TraceSourceLtm ),
                            SR.GetString(SR.UnexpectedTimerFailure),
                            null
                            ); 
                    }
                    this.timerEnabled = false; 
 
                    return;
                } 
                finally
                {
                    this.rwLock.ReleaseWriterLock();
                } 
            }
 
            // Note it is slightly subtle that we always skip the head node.  This is done 
            // on purpose because the head node contains transactions with essentially
            // an infinite timeout. 
            do
            {
                do
                { 
                    nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak;
                    if( nextWeakSet == null ) 
                    { 
                        // Nothing more to do.
                        return; 
                    }

                    nextBucketSet = (BucketSet)nextWeakSet.Target;
                    if( nextBucketSet == null ) 
                    {
                        // Again nothing more to do. 
                        return; 
                    }
                    lastBucketSet = currentBucketSet; 
                    currentBucketSet = nextBucketSet;
                }
                while( currentBucketSet.AbsoluteTimeout > this.ticks );
 
                // Tell the runtime that we are modifying global state.
                Thread.BeginCriticalRegion(); 
                try 
                {
                    // 
                    // Pinch off the list at this point making sure it is still the correct set.
                    //
                    WeakReference abortingSetsWeak =
                        (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, null, nextWeakSet ); 

                    if( abortingSetsWeak == nextWeakSet ) 
                    { 
                        // Yea - now proceed to abort the transactions.
                        BucketSet abortingBucketSets = null; 

                        do
                        {
                            if( abortingSetsWeak != null ) 
                            {
                                abortingBucketSets = (BucketSet)abortingSetsWeak.Target; 
                            } 
                            else
                            { 
                                abortingBucketSets = null;
                            }
                            if( abortingBucketSets != null )
                            { 
                                abortingBucketSets.TimeoutTransactions();
                                abortingSetsWeak = (WeakReference)abortingBucketSets.nextSetWeak; 
                            } 
                        } while( abortingBucketSets != null );
 
                        // That's all we needed to do.
                        break;
                    }
                } 
                finally
                { 
                    Thread.EndCriticalRegion(); 
                }
 
                // We missed pulling the right transactions off.  Loop back up and try again.
                currentBucketSet = lastBucketSet;
            }while( true );
        } 
    }
 
 
    class BucketSet
    { 
        // Buckets are kept in sets.  Each element of a set will have the same absoluteTimeout.
        internal object nextSetWeak;
        internal BucketSet prevSet;
 
        TransactionTable table;
 
        long absoluteTimeout; 

        internal Bucket headBucket; 

        internal BucketSet( TransactionTable table, long absoluteTimeout )
        {
            this.headBucket = new Bucket( this ); 
            this.table = table;
            this.absoluteTimeout = absoluteTimeout; 
        } 

 
        internal long AbsoluteTimeout
        {
            get
            { 
                return this.absoluteTimeout;
            } 
        } 

 
        internal void Add( InternalTransaction newTx )
        {
            while( !this.headBucket.Add( newTx ))
                ; 
        }
 
 
        internal void TimeoutTransactions()
        { 
            Bucket currentBucket = this.headBucket;
            // It will always have a head.
            do
            { 
                currentBucket.TimeoutTransactions();
 
                WeakReference nextWeakBucket = (WeakReference)currentBucket.nextBucketWeak; 
                if( nextWeakBucket != null )
                { 
                    currentBucket = (Bucket)nextWeakBucket.Target;
                }
                else
                { 
                    currentBucket = null;
                } 
            } while( currentBucket != null ); 
        }
    } 


    class Bucket
    { 
        bool timedOut;
        int index; 
        int size; 
        InternalTransaction [] transactions;
        internal WeakReference nextBucketWeak; 
        Bucket previous;

        BucketSet owningSet;
 
        internal Bucket( BucketSet owningSet )
        { 
            this.timedOut = false; 
            this.index = -1;
            this.size = 1024; // A possible design change here is to have this scale dynamically based on load. 
            transactions = new InternalTransaction [this.size];
            this.owningSet = owningSet;
        }
 

        internal bool Add( InternalTransaction tx ) 
        { 
            int currentIndex = Interlocked.Increment( ref this.index );
            if( currentIndex < this.size ) 
            {
                tx.tableBucket = this;
                tx.bucketIndex = currentIndex;
                Thread.MemoryBarrier(); // This data must be written before the transaction 
                                        // could be timed out.
                this.transactions [currentIndex] = tx; 
 
                if( this.timedOut )
                { 
                    lock( tx )
                    {
                        tx.State.Timeout( tx );
                    } 
                }
            } 
            else 
            {
                Bucket newBucket = new Bucket( this.owningSet ); 
                newBucket.nextBucketWeak = new WeakReference( this );

                Bucket oldBucket = Interlocked.CompareExchange( ref this.owningSet.headBucket, newBucket, this );
                if( oldBucket == this ) 
                {
                    // ladies and gentlemen we have a winner. 
                    this.previous = newBucket; 
                }
 
                return false;
            }
            return true;
        } 

 
        internal void Remove( InternalTransaction tx ) 
        {
            this.transactions [tx.bucketIndex] = null; 
        }


        internal void TimeoutTransactions() 
        {
            int i; 
            int transactionCount = this.index; 

            this.timedOut = true; 
            Thread.MemoryBarrier();

            for( i = 0; i <= transactionCount && i < this.size; i++ )
            { 
                Debug.Assert( transactionCount == this.index, "Index changed timing out transactions" );
                InternalTransaction tx = this.transactions [i]; 
                if( tx != null ) 
                {
                    lock( tx ) 
                    {
                        tx.State.Timeout( tx );
                    }
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
//------------------------------------------------------------------------------ 
// 
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// 
//----------------------------------------------------------------------------- 

namespace System.Transactions 
{ 
    using System;
    using System.Diagnostics; 
    using System.Threading;


    class CheapUnfairReaderWriterLock 
    {
        object writerFinishedEvent; 
 
        int readersIn;
        int readersOut; 
        bool writerPresent;

        object syncRoot;
 
        // Spin lock params
        const int MAX_SPIN_COUNT = 100; 
        const int SLEEP_TIME = 500; 

 
        public CheapUnfairReaderWriterLock()
        {
        }
 

        object SyncRoot 
        { 
            get
            { 
                if( this.syncRoot == null )
                {
                    Interlocked.CompareExchange( ref this.syncRoot, new object(), null );
                } 
                return this.syncRoot;
            } 
        } 

 
        bool ReadersPresent
        {
            get
            { 
                return this.readersIn != this.readersOut;
            } 
        } 

 
        ManualResetEvent WriterFinishedEvent
        {
            get
            { 
                if( this.writerFinishedEvent == null )
                { 
                    Interlocked.CompareExchange( ref this.writerFinishedEvent, new ManualResetEvent( true ), null ); 
                }
                return (ManualResetEvent)this.writerFinishedEvent; 
            }
        }

 
        public int AcquireReaderLock()
        { 
            int readerIndex = 0; 
            do
            { 
                if( this.writerPresent )
                {
                    WriterFinishedEvent.WaitOne();
                } 

                readerIndex = Interlocked.Increment( ref this.readersIn ); 
 
                if( !this.writerPresent )
                { 
                    break;
                }

                Interlocked.Decrement( ref this.readersIn ); 
            } while( true );
 
            return readerIndex; 
        }
 

        public void AcquireWriterLock()
        {
#pragma warning disable 0618 
            //@
            Monitor.Enter(this.SyncRoot); 
#pragma warning restore 0618 

            this.writerPresent = true; 
            this.WriterFinishedEvent.Reset();

            do
            { 
                int i = 0;
                while( ReadersPresent && i < MAX_SPIN_COUNT ) 
                { 
                    Thread.Sleep( 0 );
                    i++; 
                }

                if( ReadersPresent )
                { 
                    Thread.Sleep( SLEEP_TIME );
                } 
            } while( ReadersPresent ); 
        }
 

        public void ReleaseReaderLock()
        {
            Interlocked.Increment( ref this.readersOut ); 
        }
 
 
        public void ReleaseWriterLock()
        { 
            try
            {
                this.writerPresent = false;
                this.WriterFinishedEvent.Set(); 
            }
            finally 
            { 
                Monitor.Exit( this.SyncRoot );
            } 
        }
    }

 
    // This transaction table implementation uses an array of lists to avoid contention.  The list for a
    // transaction is decided by its hashcode. 
    class TransactionTable 
    {
        // Use a timer to initiate looking for transactions that have timed out. 
        System.Threading.Timer timer;

        // Private storage noting if the timer is enabled.
        bool timerEnabled; 

        // Store the timer interval 
        const int timerInternalExponent = 9; 
        int timerInterval;
 
        // Store the number of ticks.  A tick is a mark of 1 timer interval.  By counting ticks
        // we can avoid expensive calls to get the current time for every transaction creation.
        const long TicksPerMillisecond = 10000;
        long ticks; 
        Int64 lastTimerTime;
 
        // Sets of arrays of transactions. 
        BucketSet headBucketSet;
 
        // Synchronize adding transactions with shutting off the timer and started events.
        CheapUnfairReaderWriterLock rwLock;

        internal TransactionTable() 
        {
            // Create a timer that is initially disabled by specifing an Infinite time to the first interval 
            this.timer = new Timer( new TimerCallback(ThreadTimer), null, Timeout.Infinite, this.timerInterval ); 

            // Note that the timer is disabled 
            this.timerEnabled = false;

            // Store the timer interval
            this.timerInterval = 1 << TransactionTable.timerInternalExponent; 

            // Ticks start off at zero. 
            this.ticks = 0; 

            // The head of the list is long.MaxValue.  It contains all of the transactions that for 
            // some reason or other don't have a timeout.
            this.headBucketSet = new BucketSet( this, long.MaxValue );

            // Allocate the lock 
            rwLock = new CheapUnfairReaderWriterLock();
        } 
 

        // Calculate the maximum number of ticks for which this transaction should live 
        internal long TimeoutTicks( TimeSpan timeout )
        {
            if( timeout != TimeSpan.Zero )
            { 
                // Note: At the current setting of approximately 2 ticks per second this timer will
                //       wrap in approximately 2^64/2/60/60/24/365=292,471,208,677.5360162195585996 
                //       (nearly 300 billion) years. 
                long timeoutTicks = ((timeout.Ticks / TimeSpan.TicksPerMillisecond) >>
                        TransactionTable.timerInternalExponent) + this.ticks; 
                return timeoutTicks;
            }
            else
            { 
                return long.MaxValue;
            } 
        } 

 
        // Absolute timeout
        internal TimeSpan RecalcTimeout( InternalTransaction tx )
        {
            return TimeSpan.FromMilliseconds( (tx.AbsoluteTimeout - this.ticks) * this.timerInterval ); 
        }
 
 
        // Creation time
        private Int64 CurrentTime 
        {
            get
            {
                if( this.timerEnabled ) 
                {
                    return this.lastTimerTime; 
                } 
                else
                { 
                    return DateTime.UtcNow.Ticks;
                }
            }
        } 

 
        // Add a transaction to the table.  Transactions are added to the end of the list in sorted order based on their 
        // absolute timeout.
        internal int Add( InternalTransaction txNew ) 
        {
            // Tell the runtime that we are modifying global state.
            Thread.BeginCriticalRegion();
            int readerIndex = 0; 

            try 
            { 
                readerIndex = rwLock.AcquireReaderLock();
                try 
                {
                    // Start the timer if needed before checking the current time since the current
                    // time can be more efficient with a running timer.
                    if( txNew.AbsoluteTimeout != long.MaxValue ) 
                    {
                        if( !this.timerEnabled ) 
                        { 
                            if( !this.timer.Change( this.timerInterval, this.timerInterval ))
                            { 
                                throw TransactionException.CreateInvalidOperationException(
                                    SR.GetString( SR.TraceSourceLtm ),
                                    SR.GetString(SR.UnexpectedTimerFailure),
                                    null 
                                    );
                            } 
                            this.lastTimerTime = DateTime.UtcNow.Ticks; 
                            this.timerEnabled = true;
                        } 
                    }
                    txNew.CreationTime = CurrentTime;

                    AddIter( txNew ); 
                }
                finally 
                { 
                    rwLock.ReleaseReaderLock();
                } 
            }
            finally
            {
                Thread.EndCriticalRegion(); 
            }
 
            return readerIndex; 
        }
 

        void AddIter( InternalTransaction txNew )
        {
            // 
            // Theory of operation.
            // 
            // Note that the head bucket contains any transaction with essentially infinite 
            // timeout (long.MaxValue).  The list is sorted in decending order.  To add
            // a node the code must walk down the list looking for a set of bucket that matches 
            // the absolute timeout value for the transaction.  When it is found it passes
            // the insert down to that set.
            //
            // An importent thing to note about the list is that forward links are all weak 
            // references and reverse links are all strong references.  This allows the GC
            // to clean up old links in the list so that they don't need to be removed manually. 
            // However if there is still a rooted strong reference to an old link in the 
            // chain that link wont fall off the list because there is a strong reference held
            // forward. 
            //

            BucketSet currentBucketSet = this.headBucketSet;
 
            while( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout )
            { 
                BucketSet lastBucketSet = null; 
                do
                { 
                    WeakReference nextSetWeak = (WeakReference)currentBucketSet.nextSetWeak;
                    BucketSet nextBucketSet = null;
                    if( nextSetWeak != null )
                    { 
                        nextBucketSet = (BucketSet)nextSetWeak.Target;
                    } 
 
                    if( nextBucketSet == null )
                    { 
                        //
                        // We've reached the end of the list either because nextSetWeak was null or
                        // because its reference was collected.  This code doesn't care.  Make a new
                        // set, attempt to attach it and move on. 
                        //
                        BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout ); 
                        WeakReference newSetWeak = new WeakReference( newBucketSet ); 

                        WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                            ref currentBucketSet.nextSetWeak, newSetWeak, nextSetWeak );
                        if( oldNextSetWeak == nextSetWeak )
                        {
                            // Ladies and Gentlemen we have a winner. 
                            newBucketSet.prevSet = currentBucketSet;
                        } 
 
                        // Note that at this point we don't update currentBucketSet.  On the next loop
                        // iteration we should be able to pick up where we left off. 
                    }
                    else
                    {
                        lastBucketSet = currentBucketSet; 
                        currentBucketSet = nextBucketSet;
                    } 
                }while( currentBucketSet.AbsoluteTimeout > txNew.AbsoluteTimeout ); 

                if( currentBucketSet.AbsoluteTimeout != txNew.AbsoluteTimeout ) 
                {
                    //
                    // Getting to here means that we've found a slot in the list where this bucket set should go.
                    // 
                    BucketSet newBucketSet = new BucketSet( this, txNew.AbsoluteTimeout );
                    WeakReference newSetWeak = new WeakReference( newBucketSet ); 
 
                    newBucketSet.nextSetWeak = lastBucketSet.nextSetWeak;
                    WeakReference oldNextSetWeak = (WeakReference)Interlocked.CompareExchange( 
                        ref lastBucketSet.nextSetWeak, newSetWeak, newBucketSet.nextSetWeak );
                    if( oldNextSetWeak == newBucketSet.nextSetWeak )
                    {
                        // Ladies and Gentlemen we have a winner. 
                        if( oldNextSetWeak != null )
                        { 
                            BucketSet oldSet = (BucketSet)oldNextSetWeak.Target; 
                            if( oldSet != null )
                            { 
                                // prev references are just there to root things for the GC.  If this object is
                                // gone we don't really care.
                                oldSet.prevSet = newBucketSet;
                            } 
                        }
                        newBucketSet.prevSet = currentBucketSet; 
                    } 

                    currentBucketSet = lastBucketSet; 
                    lastBucketSet = null;

                    // The outer loop will iterate and pick up where we left off.
                } 
            }
 
            // 
            // Great we found a spot.
            // 
            currentBucketSet.Add( txNew );
        }

 
        // Remove a transaction from the table.
        internal void Remove( InternalTransaction tx ) 
        { 
            tx.tableBucket.Remove( tx );
            tx.tableBucket = null; 
        }


        // Process a timer event 
        private void ThreadTimer( Object state )
        { 
            // 
            // Theory of operation.
            // 
            // To timeout transactions we must walk down the list starting from the head
            // until we find a link with an absolute timeout that is greater than our own.
            // At that point everything further down in the list is elegable to be timed
            // out.  So simply remove that link in the list and walk down from that point 
            // timing out any transaction that is found.
            // 
 
            // There could be a ---- between this callback being queued and the timer
            // being disabled.  If we get here when the timer is disabled, just return. 
            if ( !this.timerEnabled )
            {
                return;
            } 

            // Increment the number of ticks 
            this.ticks++; 
            this.lastTimerTime = DateTime.UtcNow.Ticks;
 
            //
            // First find the starting point of transactions that should time out.  Every transaction after
            // that point will timeout so once we've found it then it is just a matter of traversing the
            // structure. 
            //
            BucketSet lastBucketSet = null; 
            BucketSet currentBucketSet = this.headBucketSet; // The list always has a head. 

            WeakReference nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak; 
            BucketSet nextBucketSet = null;
            if( nextWeakSet != null )
            {
                nextBucketSet = (BucketSet)nextWeakSet.Target; 
            }
            if( nextBucketSet == null ) 
            { 
                //
                // Special case to allow for disabling the timer. 
                //
                this.rwLock.AcquireWriterLock();
                try
                { 
                    // If there are no transactions on the timeout list we can disable the
                    // timer. 
                    if( !this.timer.Change( Timeout.Infinite, Timeout.Infinite )) 
                    {
                        throw TransactionException.CreateInvalidOperationException( 
                            SR.GetString( SR.TraceSourceLtm ),
                            SR.GetString(SR.UnexpectedTimerFailure),
                            null
                            ); 
                    }
                    this.timerEnabled = false; 
 
                    return;
                } 
                finally
                {
                    this.rwLock.ReleaseWriterLock();
                } 
            }
 
            // Note it is slightly subtle that we always skip the head node.  This is done 
            // on purpose because the head node contains transactions with essentially
            // an infinite timeout. 
            do
            {
                do
                { 
                    nextWeakSet = (WeakReference)currentBucketSet.nextSetWeak;
                    if( nextWeakSet == null ) 
                    { 
                        // Nothing more to do.
                        return; 
                    }

                    nextBucketSet = (BucketSet)nextWeakSet.Target;
                    if( nextBucketSet == null ) 
                    {
                        // Again nothing more to do. 
                        return; 
                    }
                    lastBucketSet = currentBucketSet; 
                    currentBucketSet = nextBucketSet;
                }
                while( currentBucketSet.AbsoluteTimeout > this.ticks );
 
                // Tell the runtime that we are modifying global state.
                Thread.BeginCriticalRegion(); 
                try 
                {
                    // 
                    // Pinch off the list at this point making sure it is still the correct set.
                    //
                    WeakReference abortingSetsWeak =
                        (WeakReference)Interlocked.CompareExchange( ref lastBucketSet.nextSetWeak, null, nextWeakSet ); 

                    if( abortingSetsWeak == nextWeakSet ) 
                    { 
                        // Yea - now proceed to abort the transactions.
                        BucketSet abortingBucketSets = null; 

                        do
                        {
                            if( abortingSetsWeak != null ) 
                            {
                                abortingBucketSets = (BucketSet)abortingSetsWeak.Target; 
                            } 
                            else
                            { 
                                abortingBucketSets = null;
                            }
                            if( abortingBucketSets != null )
                            { 
                                abortingBucketSets.TimeoutTransactions();
                                abortingSetsWeak = (WeakReference)abortingBucketSets.nextSetWeak; 
                            } 
                        } while( abortingBucketSets != null );
 
                        // That's all we needed to do.
                        break;
                    }
                } 
                finally
                { 
                    Thread.EndCriticalRegion(); 
                }
 
                // We missed pulling the right transactions off.  Loop back up and try again.
                currentBucketSet = lastBucketSet;
            }while( true );
        } 
    }
 
 
    class BucketSet
    { 
        // Buckets are kept in sets.  Each element of a set will have the same absoluteTimeout.
        internal object nextSetWeak;
        internal BucketSet prevSet;
 
        TransactionTable table;
 
        long absoluteTimeout; 

        internal Bucket headBucket; 

        internal BucketSet( TransactionTable table, long absoluteTimeout )
        {
            this.headBucket = new Bucket( this ); 
            this.table = table;
            this.absoluteTimeout = absoluteTimeout; 
        } 

 
        internal long AbsoluteTimeout
        {
            get
            { 
                return this.absoluteTimeout;
            } 
        } 

 
        internal void Add( InternalTransaction newTx )
        {
            while( !this.headBucket.Add( newTx ))
                ; 
        }
 
 
        internal void TimeoutTransactions()
        { 
            Bucket currentBucket = this.headBucket;
            // It will always have a head.
            do
            { 
                currentBucket.TimeoutTransactions();
 
                WeakReference nextWeakBucket = (WeakReference)currentBucket.nextBucketWeak; 
                if( nextWeakBucket != null )
                { 
                    currentBucket = (Bucket)nextWeakBucket.Target;
                }
                else
                { 
                    currentBucket = null;
                } 
            } while( currentBucket != null ); 
        }
    } 


    class Bucket
    { 
        bool timedOut;
        int index; 
        int size; 
        InternalTransaction [] transactions;
        internal WeakReference nextBucketWeak; 
        Bucket previous;

        BucketSet owningSet;
 
        internal Bucket( BucketSet owningSet )
        { 
            this.timedOut = false; 
            this.index = -1;
            this.size = 1024; // A possible design change here is to have this scale dynamically based on load. 
            transactions = new InternalTransaction [this.size];
            this.owningSet = owningSet;
        }
 

        internal bool Add( InternalTransaction tx ) 
        { 
            int currentIndex = Interlocked.Increment( ref this.index );
            if( currentIndex < this.size ) 
            {
                tx.tableBucket = this;
                tx.bucketIndex = currentIndex;
                Thread.MemoryBarrier(); // This data must be written before the transaction 
                                        // could be timed out.
                this.transactions [currentIndex] = tx; 
 
                if( this.timedOut )
                { 
                    lock( tx )
                    {
                        tx.State.Timeout( tx );
                    } 
                }
            } 
            else 
            {
                Bucket newBucket = new Bucket( this.owningSet ); 
                newBucket.nextBucketWeak = new WeakReference( this );

                Bucket oldBucket = Interlocked.CompareExchange( ref this.owningSet.headBucket, newBucket, this );
                if( oldBucket == this ) 
                {
                    // ladies and gentlemen we have a winner. 
                    this.previous = newBucket; 
                }
 
                return false;
            }
            return true;
        } 

 
        internal void Remove( InternalTransaction tx ) 
        {
            this.transactions [tx.bucketIndex] = null; 
        }


        internal void TimeoutTransactions() 
        {
            int i; 
            int transactionCount = this.index; 

            this.timedOut = true; 
            Thread.MemoryBarrier();

            for( i = 0; i <= transactionCount && i < this.size; i++ )
            { 
                Debug.Assert( transactionCount == this.index, "Index changed timing out transactions" );
                InternalTransaction tx = this.transactions [i]; 
                if( tx != null ) 
                {
                    lock( tx ) 
                    {
                        tx.State.Timeout( tx );
                    }
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK