MsmqActivation.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MsmqActivation.cs / 1 / MsmqActivation.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Activation 
{
    using System.ServiceModel.Activation.Diagnostics; 
    using System; 
    using System.Collections;
    using System.Collections.Generic; 
    using System.Diagnostics;
    using System.Net;
    using System.ServiceModel;
    using System.ServiceModel.Channels; 
    using System.ServiceProcess;
    using System.Threading; 
    using System.Messaging; 
    using System.ServiceModel.Diagnostics;
    using MQMessageQueue = System.Messaging.MessageQueue; 
    using MQMessage = System.Messaging.Message;
    using MQException = System.Messaging.MessageQueueException;

    class MsmqActivation : ServiceBase 
    {
        BindingsManager bindings; 
 
        ActivationService integrationActivationService;
        ListenerAdapter integrationListenerAdapter; 

        ActivationService transportActivationService;
        ListenerAdapter transportListenerAdapter;
 
        public MsmqActivation()
        { 
            ServiceName = ListenerConstants.MsmqActivationServiceName; 
            CanHandlePowerEvent = false;
            AutoLog = false; 
            CanStop = true;
            CanPauseAndContinue = true;
            CanShutdown = true;
 
            this.bindings = new BindingsManager();
 
            this.integrationActivationService = new ActivationService(this, MsmqUri.FormatNameAddressTranslator.Scheme); 
            this.transportActivationService = new ActivationService(this, MsmqUri.NetMsmqAddressTranslator.Scheme);
        } 

        protected override void OnStart(string[] args)
        {
            try 
            {
                if (DiagnosticUtility.ShouldTraceInformation) 
                { 
                    ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStart, this);
                } 

#if DEBUG
                if (DebuggableService.DelayStart(ServiceName))
                { 
                    (new Thread(new ThreadStart(Start))).Start();
                    return; 
                } 
#endif
                Start(); 
            }
            catch (Exception exception)
            {
                // Log the error to eventlog. 
                ListenerTraceUtility.EventLog.LogEvent(TraceEventType.Error,
                    EventLogCategory.ListenerAdapter, 
                    EventLogEventId.ServiceStartFailed, 
                    false,
                    exception.ToString()); 

                throw;
            }
        } 

        protected override void OnStop() 
        { 
            if (DiagnosticUtility.ShouldTraceInformation)
            { 
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStop, this);
            }

            Shutdown(); 
        }
 
        protected override void OnContinue() 
        {
            if (DiagnosticUtility.ShouldTraceInformation) 
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceContinue, this);
            }
 
            this.integrationActivationService.Paused = false;
            this.transportActivationService.Paused = false; 
        } 

        protected override void OnPause() 
        {
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServicePause, this); 
            }
 
            this.integrationActivationService.Paused = true; 
            this.transportActivationService.Paused = true;
        } 

        protected override void OnShutdown()
        {
            if (DiagnosticUtility.ShouldTraceInformation) 
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceShutdown, this); 
            } 

            Shutdown(); 
            Stop();
        }

        void Start() 
        {
#if DEBUG 
            DebuggableService.WaitForDebugger(ServiceName); 
#endif
            if (!SMSvcHost.IsWebhostSupported) 
            {
                const int ERROR_NOT_SUPPORTED = 50;
                this.ExitCode = ERROR_NOT_SUPPORTED;
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ServiceRequiresWas))); 
            }
 
            this.integrationListenerAdapter = new ListenerAdapter(this.integrationActivationService); 
            this.transportListenerAdapter = new ListenerAdapter(this.transportActivationService);
 
            this.integrationListenerAdapter.Open();
            this.transportListenerAdapter.Open();
        }
 
        void Shutdown()
        { 
            this.integrationListenerAdapter.Close(); 
            this.transportListenerAdapter.Close();
        } 

        class BindingsManager
        {
            Dictionary bindingMonitors; 
            object thisLock = new object();
 
            public BindingsManager() 
            {
                this.bindingMonitors = new Dictionary(StringComparer.OrdinalIgnoreCase); 
            }

            public void RegisterBindingFilterIfNecessary(string host, MsmqBindingFilter filter)
            { 
                lock (this.thisLock)
                { 
                    MsmqBindingMonitor bindingMonitor; 
                    if (!this.bindingMonitors.TryGetValue(host, out bindingMonitor))
                    { 
                        bindingMonitor = new MsmqBindingMonitor(host);
                        this.bindingMonitors.Add(host, bindingMonitor);

                        bindingMonitor.Open(); 
                    }
 
                    // register the new filter if it doesn't already exist: 
                    if (!bindingMonitor.ContainsFilter(filter))
                    { 
                        bindingMonitor.AddFilter(filter);
                    }
                }
            } 

            public void UnregisterBindingFilter(MsmqBindingFilter filter) 
            { 
                lock (this.thisLock)
                { 
                    foreach (MsmqBindingMonitor monitor in this.bindingMonitors.Values)
                    {
                        monitor.RemoveFilter(filter);
                    } 
                }
            } 
        } 

        class ActivationService : IActivationService 
        {
            Dictionary groups;
            string protocol;
            BindingsManager bindings; 
            object thisLock = new object();
            ServiceBase service; 
            bool paused; 

            public ActivationService(MsmqActivation service, string protocol) 
            {
                this.protocol = protocol;
                this.bindings = service.bindings;
                this.service = service; 
                this.paused = false;
 
                this.groups = new Dictionary(); 
            }
 
            public bool Paused
            {
                get { return this.paused; }
                set 
                {
                    lock(this) 
                    { 
                        if(this.paused != value)
                        { 
                            this.paused = value;
                            if(!this.paused)
                            {
                                foreach(QueueMonitorGroup group in this.groups.Values) 
                                {
                                    group.Start(); 
                                } 
                            }
                        } 
                    }
                }
            }
 
            public BindingsManager Bindings
            { 
                get { return this.bindings; } 
            }
 
            public string ActivationServiceName
            {
                get
                { 
                    return this.service.ServiceName;
                } 
            } 

            public string ProtocolName 
            {
                get { return this.protocol; }
            }
 
            public IActivatedMessageQueue CreateQueue(ListenerAdapter la, App app)
            { 
                QueueMonitorGroup qmg = new QueueMonitorGroup(this, la, app); 
                lock (this.thisLock)
                { 
                    this.groups[qmg.ListenerChannelContext.ListenerChannelId] = qmg;
                }
                return qmg;
            } 

            public IActivatedMessageQueue FindQueue(int queueId) 
            { 
                lock (this.thisLock)
                { 
                    QueueMonitorGroup group;
                    this.groups.TryGetValue(queueId, out group);
                    return group;
                } 
            }
 
            public void StopService() 
            {
                this.service.Stop(); 
            }

            public void QueueMonitorGroupClosed(QueueMonitorGroup qmg)
            { 
                lock (this.thisLock)
                { 
                    this.groups.Remove(qmg.ListenerChannelContext.ListenerChannelId); 
                }
            } 
        }

        class QueueMonitorGroup : IActivatedMessageQueue
        { 
            static int queueIdCounter = 0;
            static readonly TimeSpan RetryMonitorInterval = TimeSpan.FromMinutes(5); 
 
            ActivationService activationService;
            App app; 
            ActivationBindingFilter filter;
            ListenerAdapter listenerAdapter;
            int startQueueInstanceCount;
            ListenerChannelContext listenerChannelContext; 
            List monitors = new List();
            List failedMonitors = new List(); 
            bool enabled; 
            int pendingNotificationCount;
            IOThreadTimer retryTimer; 
            bool retryScheduled = false;
            bool hasStartedQueueInstances;

            public QueueMonitorGroup(ActivationService activationService, ListenerAdapter la, App app) 
            {
                this.activationService = activationService; 
                this.listenerAdapter = la; 
                this.app = app;
                this.startQueueInstanceCount = 1; 
                this.listenerChannelContext = new ListenerChannelContext(app.AppKey,
                    Interlocked.Increment(ref queueIdCounter), Guid.Empty);

                this.pendingNotificationCount = 0; 
                this.filter = new ActivationBindingFilter(this, app.Path);
                this.retryTimer = new IOThreadTimer(OnRetryTimer, null, false); 
            } 

            public bool CanDispatch 
            {
                get { return this.enabled && !this.activationService.Paused; }
            }
 
            public App App
            { 
                get { return this.app; } 
            }
 
            public ListenerChannelContext ListenerChannelContext
            {
                get { return this.listenerChannelContext; }
            } 

            bool IActivatedMessageQueue.HasStartedQueueInstances 
            { 
                get { return this.hasStartedQueueInstances; }
            } 

            void IActivatedMessageQueue.OnQueueInstancesStopped()
            {
                this.hasStartedQueueInstances = false; 
            }
 
            public void Delete() 
            {
                this.activationService.QueueMonitorGroupClosed(this); 
                UnregisterAll();
            }

            public void LaunchQueueInstance() 
            {
                bool startInstance = false; 
 
                lock (this)
                { 
                    if (this.pendingNotificationCount > 0)
                    {
                        this.pendingNotificationCount--;
                        startInstance = true; 
                    }
                    else 
                    { 
                        // start monitoring for new messages...
                        startQueueInstanceCount++; 

                        // Make sure that everyone is peeking:
                        foreach (QueueMonitor monitor in this.monitors)
                        { 
                            monitor.Start();
                        } 
                    } 
                }
 
                if (startInstance)
                {
                    if (this.listenerAdapter.OpenListenerChannelInstance(this))
                    { 
                        this.hasStartedQueueInstances = true;
                    } 
                } 
            }
 
            public ListenerExceptionStatus Register(BaseUriWithWildcard url)
            {
                this.activationService.Bindings.RegisterBindingFilterIfNecessary(url.BaseAddress.Host, this.filter);
                return ListenerExceptionStatus.Success; 
            }
 
            public void Start() 
            {
                lock(this) 
                {
                    if(this.CanDispatch)
                    {
                        // Ensure that we're started... 
                        foreach(QueueMonitor monitor in this.monitors)
                        { 
                            monitor.Start(); 
                        }
                    } 
                }
            }

            public void SetEnabledState(bool enabled) 
            {
                lock(this) 
                { 
                    if (this.enabled != enabled)
                    { 
                        this.enabled = enabled;
                        Start();
                    }
                } 
            }
 
            public void UnregisterAll() 
            {
                lock (this) 
                {
                    foreach (QueueMonitor monitor in this.monitors)
                    {
                        monitor.Dispose(); 
                    }
                    this.monitors.Clear(); 
 
                    this.activationService.Bindings.UnregisterBindingFilter(this.filter);
                } 
            }

            public bool NotifyMessageAvailable()
            { 
                bool startInstance = false;
                bool shouldContinue = false; 
 
                lock (this)
                { 
                    if (!this.CanDispatch)
                    {
                        this.pendingNotificationCount++;
                    } 
                    else if (this.startQueueInstanceCount == 0)
                    { 
                        this.pendingNotificationCount++; 
                    }
                    else 
                    {
                        this.startQueueInstanceCount--;
                        startInstance = true;
                        shouldContinue = this.startQueueInstanceCount > 0; 
                    }
                } 
 
                if (startInstance)
                { 
                    MsmqDiagnostics.StartingApplication(this.app.Path);
                    this.listenerAdapter.OpenListenerChannelInstance(this);
                    this.hasStartedQueueInstances = true;
                } 
                return shouldContinue;
            } 
 
            public void ScheduleRetry(QueueMonitor monitor)
            { 
                lock (this)
                {
                    this.failedMonitors.Add(monitor);
 
                    if (!this.retryScheduled)
                    { 
                        this.retryTimer.Set(RetryMonitorInterval); 
                        this.retryScheduled = true;
                    } 
                }
            }

            object AddQueueToGroup(Uri queue) 
            {
                QueueMonitor monitor = null; 
                lock (this) 
                {
                    monitor = new QueueMonitor(queue, this); 
                    this.monitors.Add(monitor);
                    if (this.enabled)
                    {
                        monitor.Start(); 
                    }
                } 
 
                return monitor;
            } 

            void OnRetryTimer(object state)
            {
                lock (this) 
                {
                    if (this.enabled) 
                    { 
                        foreach (QueueMonitor monitor in this.failedMonitors)
                        { 
                            // Only start it if we still own it...
                            if (this.monitors.Contains(monitor))
                            {
                                monitor.Start(); 
                            }
                        } 
                    } 
                    this.failedMonitors.Clear();
                } 
            }

            void RemoveQueueFromGroup(object state)
            { 
                QueueMonitor monitor = (QueueMonitor)state;
                lock (this) 
                { 
                    this.monitors.Remove(monitor);
                    monitor.Dispose(); 
                }
            }

            // Note that we inherit from the transport binding filter here - that's not 
            // a big deal, because we never need these uris to create services.
            class ActivationBindingFilter : MsmqBindingFilter 
            { 
                QueueMonitorGroup group;
 
                public ActivationBindingFilter(QueueMonitorGroup group, string path)
                    : base(path, MsmqUri.NetMsmqAddressTranslator)
                {
                    this.group = group; 
                }
 
                public override object MatchFound(string host, string name, bool isPrivate) 
                {
                    MsmqDiagnostics.MatchedApplicationFound(host, name, isPrivate, this.CanonicalPrefix); 
                    return this.group.AddQueueToGroup(CreateServiceUri(host, name, isPrivate));
                }

                public override void MatchLost(string host, string name, bool isPrivate, object callbackState) 
                {
                    this.group.RemoveQueueFromGroup(callbackState); 
                } 
            }
        } 

        class QueueMonitor : IDisposable
        {
            static readonly TimeSpan InfiniteTimeout = TimeSpan.FromMilliseconds(UInt32.MaxValue); 

            bool disposed; 
            QueueMonitorGroup group; 
            bool peeking;
            string queueName; 
            MQMessageQueue queue;

            public QueueMonitor(Uri uri, QueueMonitorGroup group)
            { 
                // The defaults don't really matter here - we don't use
                // the buffer manager. 
                this.group = group; 
                this.queueName = MsmqFormatName.ToSystemMessagingQueueName(MsmqUri.UriToFormatNameByScheme(uri));
                this.peeking = false; 
                Debug.Print("opening queue: " + this.queueName);
            }

            public void Start() 
            {
                lock (this) 
                { 
                    try
                    { 
                        if (this.queue == null)
                        {
                            this.queue = new MQMessageQueue(this.queueName);
                            this.queue.MessageReadPropertyFilter.ClearAll(); 
                            this.queue.MessageReadPropertyFilter.LookupId = true;
                        } 
 
                        if (!this.peeking)
                        { 
                            this.peeking = true;
                            this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted)));
                        }
                    } 
                    catch (MQException)
                    { 
                        this.group.ScheduleRetry(this); 
                    }
                } 
            }

            public void Dispose()
            { 
                lock (this)
                { 
                    this.disposed = true; 
                    if(this.queue != null)
                    { 
                        this.queue.Dispose();
                    }
                }
            } 

            void OnPeekCompleted(IAsyncResult result) 
            { 
                bool shouldContinue = true;
                try 
                {
                    MQMessage message = this.queue.EndPeek(result);

                    Debug.Print("MsmqActivation.QueueMonitor.OnPeekCompleted: message available"); 
                    shouldContinue = this.group.NotifyMessageAvailable();
                } 
                catch (MQException ex) 
                {
                    MsmqDiagnostics.CannotPeekOnQueue(this.queue.FormatName, ex); 
                    this.group.ScheduleRetry(this);
                    return;
                }
                catch (Exception ex) 
                {
                    if (DiagnosticUtility.ShouldTraceError) 
                    { 
                        DiagnosticUtility.ExceptionUtility.TraceHandledException(ex, TraceEventType.Error);
                    } 

                    if (!DiagnosticUtility.IsFatal(ex))
                    {
                        this.group.ScheduleRetry(this); 
                    }
 
                    throw; 
                }
 
                lock (this)
                {
                    if (!this.disposed && shouldContinue)
                    { 
                        this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted)));
                    } 
                    else 
                    {
                        this.peeking = false; 
                    }
                }
            }
        } 
    }
} 
 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK