ActivatedMessageQueue.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / ActivatedMessageQueue.cs / 1 / ActivatedMessageQueue.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Activation 
{
    using System; 
    using System.Threading; 
    using System.Collections;
    using System.Diagnostics; 
    using System.ServiceModel;
    using System.ServiceModel.Channels;

    class ActivatedMessageQueue : MessageQueue, IActivatedMessageQueue 
    {
        const int ThrottlingMaxSkewInMilliseconds = 5000; 
        static TimeSpan FailureThrottlingTimeout = TimeSpan.FromSeconds(15); 

        App app; 
        bool enabled;
        ListenerAdapter listenerAdapter;
        static int listenerChannelIdCounter;
        static Hashtable listenerChannelIds = new Hashtable(); 
        ListenerChannelContext listenerChannelContext;
        QueueState queueState; 
        object syncRoot = new object(); 

        // Used for failure throttling. 
        int listenerChannelFailCount;
        IOThreadTimer throttlingTimer;

        internal ActivatedMessageQueue(ListenerAdapter listenerAdapter, App app) 
            : base()
        { 
            Debug.Print("ActivatedMessageQueue.ctor(listenerAdapter:" + listenerAdapter + " appKey:" + app.AppKey + " appPoolId:" + app.AppPool.AppPoolId + ")"); 
            this.listenerAdapter = listenerAdapter;
            this.app = app; 
            this.queueState = QueueState.PendingOpen;

            CreateListenerChannelContext();
        } 

        void CreateListenerChannelContext() 
        { 
            listenerChannelContext = new ListenerChannelContext(this.app.AppKey,
                Interlocked.Increment(ref listenerChannelIdCounter), Guid.NewGuid()); 

            listenerChannelIds[listenerChannelContext.ListenerChannelId] = this;
        }
 
        public App App { get { return app; } }
        public ListenerChannelContext ListenerChannelContext { get { return listenerChannelContext; } } 
        public void Delete() 
        {
            SetEnabledState(false); 
            Close();
        }

        internal static ActivatedMessageQueue Find(int listenerChannelId) { return listenerChannelIds[listenerChannelId] as ActivatedMessageQueue; } 
        object ThisLock { get { return syncRoot; } }
        protected override bool CanShare { get { return true; } } 
 
        internal override bool CanDispatch
        { 
            get
            {
                return
                    base.CanDispatch && 
                    enabled &&
                    queueState != QueueState.Faulted && 
                    listenerAdapter.CanDispatch && 
                       (TransportType == TransportType.Tcp && !SMSvcHost.IsTcpActivationPaused
                        || TransportType == TransportType.NamedPipe && !SMSvcHost.IsNamedPipeActivationPaused) && 
                    app.AppPool.Enabled;
            }
        }
 
        // Return true if it's faulted.
        bool OnListenerChannelFailed() 
        { 
            lock (ThisLock)
            { 
                // Increment the count.
                listenerChannelFailCount++;

                if (listenerChannelFailCount <= 6) 
                {
                    return false; 
                } 

                listenerChannelFailCount = 0; 
            }

            FaultMessageQueueOnFailure();
            return true; 
        }
 
        void FaultMessageQueueOnFailure() 
        {
            lock (ThisLock) 
            {
                this.queueState = QueueState.Faulted;

                // Drop pending messages. 
                this.DropPendingMessages(true);
 
                // Throttling 
                if (throttlingTimer == null)
                { 
                    throttlingTimer = new IOThreadTimer(new WaitCallback(ThrottlingCallback),
                        this, true, ThrottlingMaxSkewInMilliseconds);
                }
 
                throttlingTimer.Set(FailureThrottlingTimeout);
            } 
        } 

        void ThrottlingCallback(object state) 
        {
            lock (ThisLock)
            {
                this.queueState = QueueState.PendingOpen; 
                listenerChannelFailCount = 0;
            } 
        } 

        public void LaunchQueueInstance() 
        {
            lock (ThisLock)
            {
                if (this.queueState == QueueState.Faulted) 
                {
                    return; 
                } 
                else if (this.queueState == QueueState.OpenedPendingConnect)
                { 
                    // We treat this as error case.
                    if (this.OnListenerChannelFailed())
                    {
                        return; 
                    }
                } 
 
                this.queueState = QueueState.PendingOpen;
            } 

            if (this.PendingCount > 0)
            {
                EnsureListenerChannelInstanceOpened(); 
            }
        } 
 
        internal static ListenerExceptionStatus Register(int listenerChannelId, Guid token, WorkerProcess worker)
        { 
            Debug.Print("ActivatedMessageQueue.Register() listenerChannelId: " + listenerChannelId + " token: " + token + " worker: " + worker.ProcessId);

            ActivatedMessageQueue thisPtr = null;
            lock (listenerChannelIds) 
            {
                thisPtr = Find(listenerChannelId); 
                if (thisPtr == null) 
                {
                    // this is an error. 
                    return ListenerExceptionStatus.InvalidArgument;
                }

                if (!token.Equals(thisPtr.listenerChannelContext.Token)) 
                {
                    return ListenerExceptionStatus.InvalidArgument; 
                } 
            }
 
            thisPtr.OnListenerChannelConnected();
            thisPtr.OnNewWorkerAvailable(worker);
            return ListenerExceptionStatus.Success;
        } 

        void OnListenerChannelConnected() 
        { 
            lock (ThisLock)
            { 
                // Clear the failure count.
                this.listenerChannelFailCount = 0;
                this.queueState = QueueState.Connected;
            } 
        }
 
        public void SetEnabledState(bool enabled) 
        {
            if (this.enabled != enabled) 
            {
                this.enabled = enabled;

                if (enabled) 
                {
                    IncrementRegistrationsActiveCounters(); 
                } 
                else
                { 
                    DecrementRegistrationsActiveCounters();
                    DropPendingMessages(true);
                }
            } 
        }
 
        protected override void OnSessionEnqueued() 
        {
            // Make sure that the ListenerChannelInstance is opened for new requests. 
            EnsureListenerChannelInstanceOpened();
        }

        protected override void OnRegisterCompleted() 
        {
            this.queueState = QueueState.PendingOpen; 
        } 

        protected override void OnUnregisterCompleted() 
        {
            this.queueState = QueueState.PendingOpen;
        }
 
        void EnsureListenerChannelInstanceOpened()
        { 
            lock (ThisLock) 
            {
                if (this.queueState != QueueState.PendingOpen) 
                {
                    return;
                }
 
                this.queueState = QueueState.OpenedPendingConnect;
            } 
 
            if (!listenerAdapter.OpenListenerChannelInstance(this))
            { 
                FaultMessageQueueOnFailure();
            }
        }
 
        bool IActivatedMessageQueue.HasStartedQueueInstances
        { 
            get 
            {
                return this.queueState == QueueState.Connected; 
            }
        }

        void IActivatedMessageQueue.OnQueueInstancesStopped() 
        {
            lock (ThisLock) 
            { 
                this.queueState = QueueState.PendingOpen;
            } 
        }

        protected override void OnUnregisterLastWorker()
        { 
        }
 
        ListenerExceptionStatus IActivatedMessageQueue.Register(BaseUriWithWildcard url) 
        {
            return base.Register(url); 
        }

        void IActivatedMessageQueue.UnregisterAll()
        { 
            base.UnregisterAll();
        } 
 
        enum QueueState
        { 
            Faulted,
            PendingOpen,
            OpenedPendingConnect,
            Connected 
        }
    } 
} 


// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK