Msmq4PoisonHandler.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / Msmq4PoisonHandler.cs / 1 / Msmq4PoisonHandler.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------
namespace System.ServiceModel.Channels
{ 
    using System.Collections.Generic;
    using System.IO; 
    using System.Threading; 
    using System.Transactions;
 
    sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy
    {
        MsmqQueue mainQueue;
        MsmqQueue mainQueueForMove; 
        MsmqQueue retryQueueForPeek;
        MsmqQueue retryQueueForMove; 
        MsmqQueue poisonQueue; 

        IOThreadTimer timer; 
        MsmqReceiveHelper receiver;

        bool disposed;
 
        string poisonQueueName;
        string retryQueueName; 
        string mainQueueName; 

        MsmqRetryQueueMessage retryQueueMessage; 
        static WaitCallback onStartPeek = new WaitCallback(StartPeek);
        static AsyncCallback onPeekCompleted = DiagnosticUtility.ThunkAsyncCallback(OnPeekCompleted);

        public Msmq4PoisonHandler(MsmqReceiveHelper receiver) 
        {
            this.receiver = receiver; 
            this.timer = new IOThreadTimer(OnTimer, null, false); 
            this.disposed = false;
            this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri); 
            this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison"));
            this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry"));
        }
 
        MsmqReceiveParameters ReceiveParameters
        { 
            get { return this.receiver.MsmqReceiveParameters; } 
        }
 
        Uri ListenUri
        {
            get { return this.receiver.ListenUri; }
        } 

        public void Open() 
        { 
            this.mainQueue = this.receiver.Queue;
            this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); 
            // Open up the poison queue (for handling poison messages).
            this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
            this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
            this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS); 
            this.retryQueueMessage = new MsmqRetryQueueMessage();
 
            if (Thread.CurrentThread.IsThreadPoolThread) 
                StartPeek(this);
            else 
                IOThreadScheduler.ScheduleCallback(Msmq4PoisonHandler.onStartPeek, this);
        }

        static void StartPeek(object state) 
        {
            Msmq4PoisonHandler handler = state as Msmq4PoisonHandler; 
            lock(handler) 
            {
                if(! handler.disposed) 
                {
                    handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler);
                }
            } 
        }
 
        public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty) 
        {
            if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount) 
                return false;
            int retryCycle = messageProperty.MoveCount / 2;

            lock(this) 
            {
                if (this.disposed) 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString())); 

                if (retryCycle >= this.ReceiveParameters.MaxRetryCycles) 
                {
                    FinalDisposition(messageProperty);
                }
                else 
                {
                    MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId); 
                    MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId); 
                }
            } 
            return true;
        }

        public void FinalDisposition(MsmqMessageProperty messageProperty) 
        {
            switch (this.ReceiveParameters.ReceiveErrorHandling) 
            { 
                case ReceiveErrorHandling.Drop:
                    this.receiver.DropOrRejectReceivedMessage(messageProperty, false); 
                    break;

                case ReceiveErrorHandling.Fault:
                    MsmqReceiveHelper.TryAbortTransactionCurrent(); 
                    if (null != this.receiver.ChannelListener)
                        this.receiver.ChannelListener.FaultListener(); 
                    if (null != this.receiver.Channel) 
                        this.receiver.Channel.FaultChannel();
                    break; 

                case ReceiveErrorHandling.Reject:
                    this.receiver.DropOrRejectReceivedMessage(messageProperty, true);
                    MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId); 
                    break;
 
                case ReceiveErrorHandling.Move: 
                    MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.poisonQueue, messageProperty.LookupId);
                    MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId); 
                    break;

                default:
                    DiagnosticUtility.DebugAssert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)"); 
                    break;
            } 
        } 

        public void Dispose() 
        {
            lock(this)
            {
                if(!this.disposed) 
                {
                    this.disposed = true; 
                    this.timer.Cancel(); 

                    if (null != this.retryQueueForPeek) 
                        this.retryQueueForPeek.Dispose();
                    if (null != this.retryQueueForMove)
                        this.retryQueueForMove.Dispose();
                    if (null != this.poisonQueue) 
                        this.poisonQueue.Dispose();
                    if (null != this.mainQueueForMove) 
                        this.mainQueueForMove.Dispose(); 
                }
            } 
        }

        static void OnPeekCompleted(IAsyncResult result)
        { 
            Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler;
            MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; 
            try 
            {
                receiveResult = handler.retryQueueForPeek.EndPeek(result); 
            }
            catch (MsmqException ex)
            {
                MsmqDiagnostics.ExpectedException(ex); 
            }
 
            if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult) 
            {
                lock(handler) 
                {
                    if(!handler.disposed)
                    {
                        // Check the time - move it, and begin peeking again 
                        // if necessary, or wait for the timeout.
 
                        DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value); 

                        TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow; 
                        if (waitTime < TimeSpan.Zero)
                            handler.OnTimer(handler);
                        else
                            handler.timer.Set(waitTime); 
                    }
                } 
            } 
        }
 
        void OnTimer(object state)
        {
            lock(this)
            { 
                if(!this.disposed)
                { 
                    try 
                    {
                        this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single); 
                    }
                    catch (MsmqException ex)
                    {
                        MsmqDiagnostics.ExpectedException(ex); 
                    }
                    this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this); 
                } 
            }
        } 

        class MsmqRetryQueueMessage : NativeMsmqMessage
        {
            LongProperty lookupId; 
            IntProperty lastMoveTime;
 
            public MsmqRetryQueueMessage() : base(2) 
            {
                this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID); 
                this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME);
            }

            public LongProperty LookupId 
            {
                get { return this.lookupId; } 
            } 

            public IntProperty LastMoveTime 
            {
                get { return this.lastMoveTime; }
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK