TwoPhaseCommit.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / TransactionBridge / Microsoft / Transactions / Wsat / InputOutput / TwoPhaseCommit.cs / 1 / TwoPhaseCommit.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

// This file implements two phase commit-related messaging 

using System; 
using System.ServiceModel.Channels; 
using System.Diagnostics;
using System.ServiceModel; 
using System.Xml;

using Microsoft.Transactions.Wsat.Messaging;
using Microsoft.Transactions.Wsat.Protocol; 
using Microsoft.Transactions.Wsat.StateMachines;
 
using DiagnosticUtility = Microsoft.Transactions.Bridge.DiagnosticUtility; 
using Fault = Microsoft.Transactions.Wsat.Messaging.Fault;
 
namespace Microsoft.Transactions.Wsat.InputOutput
{
    class TwoPhaseCommitCoordinator : ITwoPhaseCommitCoordinator
    { 
        ProtocolState state;
 
        AsyncCallback sendComplete; 
        AsyncCallback politeSendComplete;
 
        public TwoPhaseCommitCoordinator(ProtocolState state)
        {
            this.state = state;
 
            this.sendComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(SendComplete));
            this.politeSendComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(PoliteSendComplete)); 
        } 

        // 
        // ITwoPhaseCommitCoordinator
        //

        ParticipantEnlistment CheckMessage(Message message, bool fault, bool preparedOrReplay) 
        {
            Guid enlistmentId; 
            ControlProtocol protocol; 
            if (!Ports.TryGetEnlistment(message, out enlistmentId, out protocol))
            { 
                DebugTrace.Trace(TraceLevel.Warning, "Could not read enlistment header from message");
                if (fault) this.SendFault(message, this.state.Faults.InvalidParameters);
                return null;
            } 

            TransactionEnlistment enlistment = state.Lookup.FindEnlistment(enlistmentId); 
            if (enlistment == null) 
            {
                DebugTrace.Trace(TraceLevel.Verbose, "Enlistment {0} could not be found", enlistmentId); 

                if (preparedOrReplay)
                {
                    if (protocol == ControlProtocol.Volatile2PC) 
                    {
                        if (DebugTrace.Warning) 
                        { 
                            DebugTrace.Trace(
                                TraceLevel.Warning, 
                                "Received Prepared or Replay from unrecognized volatile participant at {0}",
                                Ports.TryGetFromAddress(message));
                        }
 
                        if (VolatileParticipantInDoubtRecord.ShouldTrace)
                        { 
                            VolatileParticipantInDoubtRecord.Trace( 
                                enlistmentId,
                                Library.GetReplyToHeader(message.Headers), 
                                this.state.ProtocolVersion
                                );
                        }
 
                        this.SendFault(message, this.state.Faults.UnknownTransaction);
                    } 
                    else if (protocol == ControlProtocol.Durable2PC) 
                    {
                        // Be polite 
                        this.SendRollback(message);
                    }
                    else
                    { 
                        this.SendFault(message, this.state.Faults.InvalidParameters);
                    } 
                } 
                else if (DebugTrace.Info)
                { 
                    DebugTrace.Trace(
                        TraceLevel.Info,
                        "Ignoring message from unrecognized participant at {0}",
                        Ports.TryGetFromAddress(message)); 
                }
 
                return null; 
            }
 
            ParticipantEnlistment participant = enlistment as ParticipantEnlistment;
            if (participant == null || protocol != participant.ControlProtocol)
            {
                DebugTrace.Trace(TraceLevel.Warning, "Enlistment state does not match message for {0}", enlistmentId); 
                if (fault) this.SendFault(message, this.state.Faults.InvalidParameters);
                return null; 
            } 

            if (participant.ParticipantProxy == null) 
            {
                DebugTrace.TxTrace(TraceLevel.Warning,
                                   participant.EnlistmentId,
                                   "Participant enlistment was not correctly recovered"); 

                if (fault) this.SendFault(message, this.state.Faults.InvalidPolicy); 
                return null; 
            }
 
            if (!state.Service.Security.CheckIdentity(participant.ParticipantProxy, message))
            {
                if (EnlistmentIdentityCheckFailedRecord.ShouldTrace)
                { 
                    EnlistmentIdentityCheckFailedRecord.Trace(participant.EnlistmentId);
                } 
 
                // no fault reply is sent in order to replicate the security
                // infrastructure behavior - see MB55336 

                return null;
            }
 
            return participant;
        } 
 
        public void Prepared(Message message)
        { 
            ParticipantEnlistment participant = CheckMessage(message, true, true);
            if (participant != null)
            {
                participant.StateMachine.Enqueue(new MsgPreparedEvent(participant)); 
            }
        } 
 
        public void Aborted(Message message)
        { 
            ParticipantEnlistment participant = CheckMessage(message, true, false);
            if (participant != null)
            {
                participant.StateMachine.Enqueue(new MsgAbortedEvent(participant)); 
            }
        } 
 
        public void ReadOnly(Message message)
        { 
            ParticipantEnlistment participant = CheckMessage(message, true, false);
            if (participant != null)
            {
                participant.StateMachine.Enqueue(new MsgReadOnlyEvent(participant)); 
            }
        } 
 
        public void Committed(Message message)
        { 
            ParticipantEnlistment participant = CheckMessage(message, true, false);
            if (participant != null)
            {
                participant.StateMachine.Enqueue(new MsgCommittedEvent(participant)); 
            }
        } 
 
        public void Replay(Message message)
        { 
            ProtocolVersionHelper.AssertProtocolVersion10(this.state.ProtocolVersion, this.GetType(), "Replay");

            ParticipantEnlistment participant = CheckMessage(message, true, true);
            if (participant != null) 
            {
                participant.StateMachine.Enqueue(new MsgReplayEvent(participant)); 
            } 
        }
 
        public void Fault(Message message, MessageFault fault)
        {
            ParticipantEnlistment participant = CheckMessage(message, false, false);
            if (participant != null) 
            {
                state.Perf.FaultsReceivedCountPerInterval.Increment(); 
 
                participant.StateMachine.Enqueue(new MsgParticipantFaultEvent(participant, fault));
            } 
        }

        //
        // Send complete 
        //
 
        void SendComplete(IAsyncResult ar) 
        {
            if (!ar.CompletedSynchronously) 
            {
                OnSendComplete(ar, (ParticipantEnlistment) ar.AsyncState);
            }
        } 

        void OnSendComplete(IAsyncResult ar, ParticipantEnlistment participant) 
        { 
            Exception failed = null;
            try 
            {
                participant.ParticipantProxy.EndSendMessage(ar);
            }
            catch (WsatSendFailureException e) 
            {
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning); 
                DebugTrace.TraceSendFailure(participant.EnlistmentId, e); 

                state.Perf.MessageSendFailureCountPerInterval.Increment(); 
                failed = e;
            }

            if (failed != null) 
            {
                participant.StateMachine.Enqueue(new MsgParticipantSendFailureEvent(participant)); 
            } 
        }
 
        void PoliteSendComplete(IAsyncResult ar)
        {
            if (!ar.CompletedSynchronously)
            { 
                OnPoliteSendComplete(ar, (TwoPhaseCommitParticipantProxy)ar.AsyncState);
            } 
        } 

        void OnPoliteSendComplete(IAsyncResult ar, TwoPhaseCommitParticipantProxy proxy) 
        {
            try
            {
                proxy.EndSendMessage(ar); 
            }
            catch (WsatSendFailureException e) 
            { 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning);
                DebugTrace.TraceSendFailure(e); 

                state.Perf.MessageSendFailureCountPerInterval.Increment();

            } 
        }
 
        // 
        // Send messages
        // 

        public void SendPrepare(ParticipantEnlistment participant)
        {
            if (DebugTrace.Info) 
            {
                DebugTrace.TxTrace( 
                    TraceLevel.Info, 
                    participant.EnlistmentId,
                    "Sending Prepare to {0} participant at {1}", 
                    participant.ControlProtocol,
                    Ports.TryGetAddress(participant.ParticipantProxy));
            }
 
            IAsyncResult ar = participant.ParticipantProxy.BeginSendPrepare(this.sendComplete, participant);
            if (ar.CompletedSynchronously) 
            { 
                OnSendComplete(ar, participant);
            } 
        }

        public void SendCommit(ParticipantEnlistment participant)
        { 
            if (DebugTrace.Info)
            { 
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    participant.EnlistmentId, 
                    "Sending Commit to {0} participant at {1}",
                    participant.ControlProtocol,
                    Ports.TryGetAddress(participant.ParticipantProxy));
            } 

            IAsyncResult ar = participant.ParticipantProxy.BeginSendCommit(this.sendComplete, participant); 
            if (ar.CompletedSynchronously) 
            {
                OnSendComplete(ar, participant); 
            }
        }

        public void SendCommit(EndpointAddress sendTo) 
        {
            if (sendTo != null) 
            { 
                TwoPhaseCommitParticipantProxy proxy = state.TryCreateTwoPhaseCommitParticipantProxy(sendTo);
                if (proxy != null) 
                {
                    try
                    {
                        if (DebugTrace.Info) 
                        {
                            DebugTrace.Trace(TraceLevel.Info, 
                                             "Sending Commit to unrecognized participant at {0}", 
                                             Ports.TryGetAddress(proxy));
                        } 

                        proxy.From = CreateForgottenSource();
                        IAsyncResult ar = proxy.BeginSendCommit(this.politeSendComplete, proxy);
                        if (ar.CompletedSynchronously) 
                        {
                            OnPoliteSendComplete(ar, proxy); 
                        } 
                    }
                    finally 
                    {
                        proxy.Release();
                    }
                } 
            }
        } 
 
        public void SendRollback(ParticipantEnlistment participant)
        { 
            if (DebugTrace.Info)
            {
                DebugTrace.TxTrace(
                    TraceLevel.Info, 
                    participant.EnlistmentId,
                    "Sending Rollback to {0} participant at {1}", 
                    participant.ControlProtocol, 
                    Ports.TryGetAddress(participant.ParticipantProxy));
            } 

            IAsyncResult ar = participant.ParticipantProxy.BeginSendRollback(this.sendComplete, participant);
            if (ar.CompletedSynchronously)
            { 
                OnSendComplete(ar, participant);
            } 
        } 

        void SendRollback(Message message) 
        {
            SendRollback(Library.GetReplyToHeader(message.Headers));
        }
 
        public void SendRollback(EndpointAddress sendTo)
        { 
            if (sendTo != null) 
            {
                TwoPhaseCommitParticipantProxy proxy = state.TryCreateTwoPhaseCommitParticipantProxy(sendTo); 
                if (proxy != null)
                {
                    try
                    { 
                        if (DebugTrace.Info)
                        { 
                            DebugTrace.Trace(TraceLevel.Info, 
                                             "Sending Rollback to unrecognized participant at {0}",
                                             Ports.TryGetAddress(proxy)); 
                        }

                        proxy.From = CreateForgottenSource();
                        IAsyncResult ar = proxy.BeginSendRollback(this.politeSendComplete, proxy); 
                        if (ar.CompletedSynchronously)
                        { 
                            OnPoliteSendComplete(ar, proxy); 
                        }
                    } 
                    finally
                    {
                        proxy.Release();
                    } 
                }
            } 
        } 

        EndpointAddress forgottenSource; 
        EndpointAddress CreateForgottenSource()
        {
            if (this.forgottenSource == null)
            { 
                EnlistmentHeader header = new EnlistmentHeader(Guid.Empty, ControlProtocol.None);
                this.forgottenSource = this.state.TwoPhaseCommitCoordinatorListener.CreateEndpointReference(header); 
            } 
            return this.forgottenSource;
        } 

        void SendFault(Message message, Fault fault)
        {
            SendFault(Library.GetFaultToHeader(message.Headers, this.state.ProtocolVersion), message.Headers.MessageId, fault); 
        }
 
        public void SendFault(EndpointAddress faultTo, UniqueId messageID, Fault fault) 
        {
            if (faultTo != null) 
            {
                state.FaultSender.TrySendTwoPhaseCommitParticipantFault(faultTo, messageID, fault);
            }
        } 
    }
 
    class TwoPhaseCommitParticipant : ITwoPhaseCommitParticipant 
    {
        ProtocolState state; 

        AsyncCallback durableSendComplete;
        AsyncCallback volatileSendComplete;
        AsyncCallback politeSendComplete; 

        public TwoPhaseCommitParticipant(ProtocolState state) 
        { 
            this.state = state;
 
            this.durableSendComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DurableSendComplete));
            this.volatileSendComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(VolatileSendComplete));
            this.politeSendComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(PoliteSendComplete));
        } 

        // 
        // ITwoPhaseCommitParticipant 
        //
 
        bool CheckMessage(Message message,
                           bool fault,
                           out CoordinatorEnlistment durableCoordinator,
                           out VolatileCoordinatorEnlistment volatileCoordinator) 
        {
            durableCoordinator = null; 
            volatileCoordinator = null; 

            Guid enlistmentId; 
            if (!Ports.TryGetEnlistment(message, out enlistmentId))
            {
                DebugTrace.Trace(TraceLevel.Warning, "Could not read enlistment header from message");
                if (fault) this.SendFault(message, this.state.Faults.InvalidParameters); 
                return false;
            } 
 
            TransactionEnlistment enlistment = state.Lookup.FindEnlistment(enlistmentId);
            if (enlistment == null) 
            {
                DebugTrace.Trace(TraceLevel.Warning, "Could not find enlistment {0}", enlistmentId);
                return true;
            } 

            // Check enlistment type, obtain coordinator proxy for access check 
            TwoPhaseCommitCoordinatorProxy proxy; 

            durableCoordinator = enlistment as CoordinatorEnlistment; 
            if (durableCoordinator == null)
            {
                volatileCoordinator = enlistment as VolatileCoordinatorEnlistment;
                if (volatileCoordinator == null) 
                {
                    DebugTrace.Trace(TraceLevel.Warning, "2PC message received for non-2PC enlistment {0}", enlistmentId); 
                    if (fault) this.SendFault(message, this.state.Faults.InvalidParameters); 
                    return false;
                } 
                proxy = volatileCoordinator.CoordinatorProxy;
            }
            else
            { 
                proxy = durableCoordinator.CoordinatorProxy;
            } 
 
            // Check proxy
            if (proxy == null) 
            {
                // This could either be a premature Prepare message(which can happen if the
                // RegisterResponse message is delayed and the committer jumps the gun) or it
                // could indicate a failure to recover a subordinate transaction 
                if (durableCoordinator != null &&
                    ReferenceEquals(durableCoordinator.StateMachine.State, 
                                    state.States.CoordinatorFailedRecovery)) 
                {
                    DebugTrace.TxTrace(TraceLevel.Warning, 
                                       enlistment.EnlistmentId,
                                       "Coordinator enlistment was not correctly recovered");

                    if (fault) this.SendFault(message, this.state.Faults.InvalidPolicy); 
                    return false;
                } 
                else 
                {
                    if (DebugTrace.Warning) 
                    {
                        DebugTrace.TxTrace(
                            TraceLevel.Warning,
                            enlistment.EnlistmentId, 
                            "Received premature message with action {0}",
                            message.Headers.Action 
                            ); 
                    }
 
                    // We don't forward this message on to the state machines because we can't authorize
                    // it. Hopefully our superior will abort the transaction when it sees this fault.
                    // We will simply allow the timeout to handle our local state.
                    if (fault) this.SendFault(message, this.state.Faults.InvalidState); 
                    return false;
                } 
            } 

            if (!state.Service.Security.CheckIdentity(proxy, message)) 
            {
                if (EnlistmentIdentityCheckFailedRecord.ShouldTrace)
                {
                    EnlistmentIdentityCheckFailedRecord.Trace(enlistment.EnlistmentId); 
                }
 
                // no fault reply is sent in order to replicate the security 
                // infrastructure behavior - see MB55336
 
                return false;
            }

            return true; 
        }
 
        public void Prepare(Message message) 
        {
            CoordinatorEnlistment coordinator; 
            VolatileCoordinatorEnlistment volatileCoordinator;

            if (CheckMessage(message, true, out coordinator, out volatileCoordinator))
            { 
                if (coordinator != null)
                { 
                    coordinator.StateMachine.Enqueue(new MsgDurablePrepareEvent(coordinator)); 
                }
                else if (volatileCoordinator != null) 
                {
                    volatileCoordinator.StateMachine.Enqueue(new MsgVolatilePrepareEvent(volatileCoordinator));
                }
                else 
                {
                    // Be polite 
                    this.SendAborted(message); 
                }
            } 
        }

        public void Commit(Message message)
        { 
            CoordinatorEnlistment coordinator;
            VolatileCoordinatorEnlistment volatileCoordinator; 
 
            if (CheckMessage(message, true, out coordinator, out volatileCoordinator))
            { 
                if (coordinator != null)
                {
                    coordinator.StateMachine.Enqueue(new MsgDurableCommitEvent(coordinator));
                } 
                else if (volatileCoordinator != null)
                { 
                    volatileCoordinator.StateMachine.Enqueue(new MsgVolatileCommitEvent(volatileCoordinator)); 
                }
                else 
                {
                    // Be polite
                    this.SendCommitted(message);
                } 
            }
        } 
 
        public void Rollback(Message message)
        { 
            CoordinatorEnlistment coordinator;
            VolatileCoordinatorEnlistment volatileCoordinator;

            if (CheckMessage(message, true, out coordinator, out volatileCoordinator)) 
            {
                if (coordinator != null) 
                { 
                    coordinator.StateMachine.Enqueue(new MsgDurableRollbackEvent(coordinator));
                } 
                else if (volatileCoordinator != null)
                {
                    volatileCoordinator.StateMachine.Enqueue(new MsgVolatileRollbackEvent(volatileCoordinator));
                } 
                else
                { 
                    // Be polite 
                    this.SendAborted(message);
                } 
            }
        }

        public void Fault(Message message, MessageFault fault) 
        {
            CoordinatorEnlistment coordinator; 
            VolatileCoordinatorEnlistment volatileCoordinator; 

            if (CheckMessage(message, false, out coordinator, out volatileCoordinator)) 
            {
                if (coordinator != null)
                {
                    coordinator.StateMachine.Enqueue(new MsgDurableCoordinatorFaultEvent(coordinator, fault)); 
                }
                else if (volatileCoordinator != null) 
                { 
                    volatileCoordinator.StateMachine.Enqueue(new MsgVolatileCoordinatorFaultEvent(volatileCoordinator, fault));
                } 
                else
                {
                    // Don't be polite with faults
                    if (DebugTrace.Info) 
                    {
                        DebugTrace.Trace(TraceLevel.Info, 
                                         "Ignoring {0} fault from unrecognized coordinator at {1}: {2}", 
                                         Library.GetFaultCodeName(fault),
                                         Ports.TryGetFromAddress(message), 
                                         Library.GetFaultCodeReason(fault));
                    }
                }
 
                state.Perf.FaultsReceivedCountPerInterval.Increment();
            } 
        } 

        // 
        // Send complete
        //

        void DurableSendComplete(IAsyncResult ar) 
        {
            if (!ar.CompletedSynchronously) 
            { 
                OnDurableSendComplete(ar, (CoordinatorEnlistment)ar.AsyncState);
            } 
        }

        void OnDurableSendComplete(IAsyncResult ar, CoordinatorEnlistment coordinator)
        { 
            Exception failed = null;
            try 
            { 
                coordinator.CoordinatorProxy.EndSendMessage(ar);
            } 
            catch (WsatSendFailureException e)
            {
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning);
                DebugTrace.TraceSendFailure(coordinator.EnlistmentId, e); 

                state.Perf.MessageSendFailureCountPerInterval.Increment(); 
                failed = e; 
            }
 
            if (failed != null)
            {
                coordinator.StateMachine.Enqueue(
                    new MsgDurableCoordinatorSendFailureEvent(coordinator)); 
            }
        } 
 
        void VolatileSendComplete(IAsyncResult ar)
        { 
            if (!ar.CompletedSynchronously)
            {
                OnVolatileSendComplete(ar, (VolatileCoordinatorEnlistment)ar.AsyncState);
            } 
        }
 
        void OnVolatileSendComplete(IAsyncResult ar, VolatileCoordinatorEnlistment volatileCoordinator) 
        {
            Exception failed = null; 
            try
            {
                volatileCoordinator.CoordinatorProxy.EndSendMessage(ar);
            } 
            catch (WsatSendFailureException e)
            { 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning); 
                DebugTrace.TraceSendFailure(volatileCoordinator.EnlistmentId, e);
 
                state.Perf.MessageSendFailureCountPerInterval.Increment();
                failed = e;
            }
 
            if (failed != null)
            { 
                volatileCoordinator.StateMachine.Enqueue( 
                    new MsgVolatileCoordinatorSendFailureEvent(volatileCoordinator));
            } 
        }

        void PoliteSendComplete(IAsyncResult ar)
        { 
            if (!ar.CompletedSynchronously)
            { 
                OnPoliteSendComplete(ar, (TwoPhaseCommitCoordinatorProxy) ar.AsyncState); 
            }
        } 

        void OnPoliteSendComplete(IAsyncResult ar, TwoPhaseCommitCoordinatorProxy proxy)
        {
            try 
            {
                proxy.EndSendMessage(ar); 
            } 
            catch (WsatSendFailureException e)
            { 
                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Warning);
                DebugTrace.TraceSendFailure(e);

                state.Perf.MessageSendFailureCountPerInterval.Increment(); 
            }
        } 
 
        //
        // Messages 
        //

        public void SendPrepared(CoordinatorEnlistment coordinator)
        { 
            if (DebugTrace.Info)
            { 
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    coordinator.EnlistmentId, 
                    "Sending Prepared to durable coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy));
            }
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendPrepared(this.durableSendComplete, coordinator);
            if (ar.CompletedSynchronously) 
            { 
                OnDurableSendComplete(ar, coordinator);
            } 
        }

        public void SendDurableReadOnly(CoordinatorEnlistment coordinator)
        { 
            if (DebugTrace.Info)
            { 
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    coordinator.EnlistmentId, 
                    "Sending ReadOnly to durable coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy));
            }
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendReadOnly(this.durableSendComplete, coordinator);
            if (ar.CompletedSynchronously) 
            { 
                OnDurableSendComplete(ar, coordinator);
            } 
        }

        public void SendVolatileReadOnly(VolatileCoordinatorEnlistment coordinator)
        { 
            if (DebugTrace.Info)
            { 
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    coordinator.EnlistmentId, 
                    "Sending ReadOnly to volatile coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy));
            }
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendReadOnly(this.volatileSendComplete, coordinator);
            if (ar.CompletedSynchronously) 
            { 
                OnVolatileSendComplete(ar, coordinator);
            } 
        }

        public void SendReadOnly(EndpointAddress sendTo)
        { 
            if (sendTo != null)
            { 
                TwoPhaseCommitCoordinatorProxy proxy = state.TryCreateTwoPhaseCommitCoordinatorProxy(sendTo); 
                if (proxy != null)
                { 
                    try
                    {
                        if (DebugTrace.Info)
                        { 
                            DebugTrace.Trace(TraceLevel.Info,
                                             "Sending ReadOnly to unrecognized participant at {0}", 
                                             Ports.TryGetAddress(proxy)); 
                        }
 
                        IAsyncResult ar = proxy.BeginSendReadOnly(this.politeSendComplete, proxy);
                        if (ar.CompletedSynchronously)
                        {
                            OnPoliteSendComplete(ar, proxy); 
                        }
                    } 
                    finally 
                    {
                        proxy.Release(); 
                    }
                }
            }
        } 

        public void SendCommitted(CoordinatorEnlistment coordinator) 
        { 
            if (DebugTrace.Info)
            { 
                DebugTrace.TxTrace(
                    TraceLevel.Info,
                    coordinator.EnlistmentId,
                    "Sending Committed to durable coordinator at {0}", 
                    Ports.TryGetAddress(coordinator.CoordinatorProxy));
            } 
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendCommitted(this.durableSendComplete, coordinator);
            if (ar.CompletedSynchronously) 
            {
                OnDurableSendComplete(ar, coordinator);
            }
        } 

        void SendCommitted(Message message) 
        { 
            SendCommitted(Library.GetReplyToHeader(message.Headers));
        } 

        public void SendCommitted(EndpointAddress sendTo)
        {
            if (sendTo != null) 
            {
                TwoPhaseCommitCoordinatorProxy proxy = state.TryCreateTwoPhaseCommitCoordinatorProxy(sendTo); 
                if (proxy != null) 
                {
                    try 
                    {
                        if (DebugTrace.Info)
                        {
                            DebugTrace.Trace(TraceLevel.Info, 
                                             "Sending Committed to unrecognized coordinator at {0}",
                                             Ports.TryGetAddress(proxy)); 
                        } 

                        IAsyncResult ar = proxy.BeginSendCommitted(this.politeSendComplete, proxy); 
                        if (ar.CompletedSynchronously)
                        {
                            OnPoliteSendComplete(ar, proxy);
                        } 
                    }
                    finally 
                    { 
                        proxy.Release();
                    } 
                }
            }
        }
 
        public void SendDurableAborted(CoordinatorEnlistment coordinator)
        { 
            if (DebugTrace.Info) 
            {
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    coordinator.EnlistmentId,
                    "Sending Aborted to durable coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy)); 
            }
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendAborted(this.durableSendComplete, coordinator); 
            if (ar.CompletedSynchronously)
            { 
                OnDurableSendComplete(ar, coordinator);
            }
        }
 
        public void SendVolatileAborted(VolatileCoordinatorEnlistment coordinator)
        { 
            if (DebugTrace.Info) 
            {
                DebugTrace.TxTrace( 
                    TraceLevel.Info,
                    coordinator.EnlistmentId,
                    "Sending Aborted to volatile coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy)); 
            }
 
            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendAborted(this.volatileSendComplete, coordinator); 
            if (ar.CompletedSynchronously)
            { 
                OnVolatileSendComplete(ar, coordinator);
            }
        }
 
        void SendAborted(Message message)
        { 
            SendAborted(Library.GetReplyToHeader(message.Headers)); 
        }
 
        public void SendAborted(EndpointAddress sendTo)
        {
            if (sendTo != null)
            { 
                TwoPhaseCommitCoordinatorProxy proxy = state.TryCreateTwoPhaseCommitCoordinatorProxy(sendTo);
                if (proxy != null) 
                { 
                    try
                    { 
                        if (DebugTrace.Info)
                        {
                            DebugTrace.Trace(TraceLevel.Info,
                                             "Sending Aborted to unrecognized coordinator at {0}", 
                                             Ports.TryGetAddress(proxy));
                        } 
 
                        IAsyncResult ar = proxy.BeginSendAborted(this.politeSendComplete, proxy);
                        if (ar.CompletedSynchronously) 
                        {
                            OnPoliteSendComplete(ar, proxy);
                        }
                    } 
                    finally
                    { 
                        proxy.Release(); 
                    }
                } 
            }
        }

        public void SendRecoverMessage(CoordinatorEnlistment coordinator) 
        {
            if (DebugTrace.Info) 
            { 
                DebugTrace.TxTrace(
                    TraceLevel.Info, 
                    coordinator.EnlistmentId,
                    "Sending Replay to durable coordinator at {0}",
                    Ports.TryGetAddress(coordinator.CoordinatorProxy));
            } 

            IAsyncResult ar = coordinator.CoordinatorProxy.BeginSendRecoverMessage(this.durableSendComplete, coordinator); 
            if (ar.CompletedSynchronously) 
            {
                OnDurableSendComplete(ar, coordinator); 
            }
        }

        void SendFault(Message message, Fault fault) 
        {
            SendFault(Library.GetFaultToHeader(message.Headers, this.state.ProtocolVersion), message.Headers.MessageId, fault); 
        } 

        public void SendFault(EndpointAddress faultTo, UniqueId messageID, Fault fault) 
        {
            if (faultTo != null)
            {
                state.FaultSender.TrySendTwoPhaseCommitCoordinatorFault(faultTo, messageID, fault); 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK