FramingChannels.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / FramingChannels.cs / 1 / FramingChannels.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------

namespace System.ServiceModel.Channels 
{
    using System.Collections.Generic; 
    using System.ServiceModel; 
    using System.Diagnostics;
    using System.IO; 
    using System.Runtime.CompilerServices;
    using System.IdentityModel.Claims;
    using System.IdentityModel.Policy;
    using System.ServiceModel.Diagnostics; 
    using System.ServiceModel.Security;
    using System.Threading; 
 
    abstract class FramingDuplexSessionChannel : TransportOutputChannel, IDuplexSessionChannel
    { 
        BufferManager bufferManager;
        IConnection connection;
        ConnectionDuplexSession duplexSession;
        bool exposeConnectionProperty; 
        bool isInputSessionClosed;
        bool isOutputSessionClosed; 
        MessageEncoder messageEncoder; 
        SynchronizedMessageSource messageSource;
        SecurityMessageProperty remoteSecurity; 
        EndpointAddress localAddress;
        ThreadNeutralSemaphore sendLock = new ThreadNeutralSemaphore(1);
        Uri localVia;
 
        FramingDuplexSessionChannel(ChannelManagerBase manager, IConnectionOrientedTransportFactorySettings settings,
            EndpointAddress localAddress, Uri localVia, EndpointAddress remoteAddresss, Uri via, bool exposeConnectionProperty) 
            : base(manager, remoteAddresss, via, settings.ManualAddressing, settings.MessageVersion) 
        {
            this.localAddress = localAddress; 
            this.localVia = localVia;
            this.exposeConnectionProperty = exposeConnectionProperty;
            this.bufferManager = settings.BufferManager;
        } 

        protected FramingDuplexSessionChannel(ChannelManagerBase factory, IConnectionOrientedTransportFactorySettings settings, 
            EndpointAddress remoteAddresss, Uri via, bool exposeConnectionProperty) 
            : this(factory, settings, EndpointAddress.AnonymousAddress, settings.MessageVersion.Addressing.AnonymousUri,
            remoteAddresss, via, exposeConnectionProperty) 
        {
            this.duplexSession = ConnectionDuplexSession.CreateSession(this, settings.Upgrade);
        }
 
        protected FramingDuplexSessionChannel(ConnectionOrientedTransportChannelListener channelListener,
            EndpointAddress localAddress, Uri localVia, bool exposeConnectionProperty) 
            : this(channelListener, channelListener, localAddress, localVia, 
            EndpointAddress.AnonymousAddress, channelListener.MessageVersion.Addressing.AnonymousUri, exposeConnectionProperty)
        { 
            this.duplexSession = ConnectionDuplexSession.CreateSession(this, channelListener.Upgrade);
        }

        protected BufferManager BufferManager 
        {
            get 
            { 
                return this.bufferManager;
            } 
        }

        protected IConnection Connection
        { 
            get
            { 
                return connection; 
            }
            set 
            {
                this.connection = value;
            }
        } 

        public EndpointAddress LocalAddress 
        { 
            get { return localAddress; }
        } 

        protected Uri LocalVia
        {
            get { return this.localVia; } 
        }
 
        protected MessageEncoder MessageEncoder 
        {
            get { return this.messageEncoder; } 
            set { this.messageEncoder = value; }
        }

        public SecurityMessageProperty RemoteSecurity 
        {
            get { return this.remoteSecurity; } 
            protected set { this.remoteSecurity = value; } 
        }
 
        public IDuplexSession Session
        {
            get { return this.duplexSession; }
        } 

        protected void SetMessageSource(IMessageSource messageSource) 
        { 
            this.messageSource = new SynchronizedMessageSource(messageSource);
        } 

        void OnInputSessionClosed()
        {
            lock (ThisLock) 
            {
                if (isInputSessionClosed) 
                    return; 

                isInputSessionClosed = true; 
            }
        }

        void OnOutputSessionClosed(ref TimeoutHelper timeoutHelper) 
        {
            bool releaseConnection = false; 
            lock (ThisLock) 
            {
                if (isInputSessionClosed) // we're all done, release the connection 
                {
                    releaseConnection = true;
                }
            } 

            if (releaseConnection) 
            { 
                ReturnConnectionIfNecessary(false, timeoutHelper.RemainingTime());
            } 
        }

        IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            return new CloseOutputSessionAsyncResult(this, timeout, callback, state);
        } 
 
        void EndCloseOutputSession(IAsyncResult result)
        { 
            CloseOutputSessionAsyncResult.End(result);
        }

        void CloseOutputSession(TimeSpan timeout) 
        {
            ThrowIfNotOpened(); 
            ThrowIfFaulted(); 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            if (!sendLock.TryEnter(timeoutHelper.RemainingTime())) 
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                    new TimeoutException(SR.GetString(SR.CloseTimedOut, timeout),
                    ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout))); 
            }
 
            try 
            {
                // check again in case the previous send faulted while we were waiting for the lock 
                ThrowIfFaulted();

                // we're synchronized by sendLock here
                if (isOutputSessionClosed) 
                {
                    return; 
                } 
                isOutputSessionClosed = true;
                bool shouldFault = true; 
                try
                {
                    Connection.Write(SessionEncoder.EndBytes, 0, SessionEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime());
                    OnOutputSessionClosed(ref timeoutHelper); 
                    shouldFault = false;
                } 
                finally 
                {
                    if (shouldFault) 
                    {
                        this.Fault();
                    }
                } 
            }
            finally 
            { 
                sendLock.Exit();
            } 
        }

        // cleanup after the framing handshake has completed
        void CompleteClose(TimeSpan timeout) 
        {
            ReturnConnectionIfNecessary(false, timeout); 
        } 

        // used to return cached connection to the pool/reader pool 
        protected abstract void ReturnConnectionIfNecessary(bool abort, TimeSpan timeout);

        protected override void OnAbort()
        { 
            ReturnConnectionIfNecessary(true, TimeSpan.Zero);
        } 
 
        protected override void OnFaulted()
        { 
            base.OnFaulted();
            ReturnConnectionIfNecessary(true, TimeSpan.Zero);
        }
 
        protected override void OnClose(TimeSpan timeout)
        { 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
            this.CloseOutputSession(timeoutHelper.RemainingTime());
 
            // close input session if necessary
            if (!isInputSessionClosed)
            {
                Message message = messageSource.Receive(timeoutHelper.RemainingTime()); 
                if (message != null)
                { 
                    using (message) 
                    {
                        ProtocolException error = ProtocolException.ReceiveShutdownReturnedNonNull(message); 
                        throw TraceUtility.ThrowHelperError(error, message);
                    }
                }
                OnInputSessionClosed(); 
            }
 
            CompleteClose(timeoutHelper.RemainingTime()); 
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CloseAsyncResult(this, timeout, callback, state);
        } 

        protected override void OnEndClose(IAsyncResult result) 
        { 
            CloseAsyncResult.End(result);
        } 

        public Message Receive()
        {
            return this.Receive(this.DefaultReceiveTimeout); 
        }
 
        public Message Receive(TimeSpan timeout) 
        {
            Message message = null; 
            if (DoneReceivingInCurrentState())
            {
                return null;
            } 

            bool shouldFault = true; 
            try 
            {
                message = messageSource.Receive(timeout); 
                OnReceiveMessage(message);
                shouldFault = false;
                return message;
            } 
            finally
            { 
                if (shouldFault) 
                {
                    if (message != null) 
                    {
                        message.Close();
                        message = null;
                    } 
                    this.Fault();
                } 
            } 
        }
 
        public IAsyncResult BeginReceive(AsyncCallback callback, object state)
        {
            return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
        } 

        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            if (DoneReceivingInCurrentState())
            { 
                return new DoneReceivingAsyncResult(callback, state);
            }

            bool shouldFault = true; 
            try
            { 
                IAsyncResult result = messageSource.BeginReceive(timeout, callback, state); 
                shouldFault = false;
                return result; 
            }
            finally
            {
                if (shouldFault) 
                {
                    this.Fault(); 
                } 
            }
        } 

        public Message EndReceive(IAsyncResult result)
        {
            base.ThrowIfNotOpened(); // we can't be in Created or Opening 
            if (result == null)
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result"); 
            }
 
            DoneReceivingAsyncResult doneReceivingResult = result as DoneReceivingAsyncResult;
            if (doneReceivingResult != null)
            {
                DoneReceivingAsyncResult.End(doneReceivingResult); 
                return null;
            } 
 
            bool shouldFault = true;
            Message message = null; 
            try
            {
                message = messageSource.EndReceive(result);
                OnReceiveMessage(message); 
                shouldFault = false;
                return message; 
            } 
            finally
            { 
                if (shouldFault)
                {
                    if (message != null)
                    { 
                        message.Close();
                        message = null; 
                    } 
                    this.Fault();
                } 
            }
        }

        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return new TryReceiveAsyncResult(this, timeout, callback, state); 
        } 

        public bool EndTryReceive(IAsyncResult result, out Message message) 
        {
            return TryReceiveAsyncResult.End(result, out message);
        }
 
        void OnReceiveMessage(Message message)
        { 
            if (message == null) 
            {
                OnInputSessionClosed(); 
            }
            else
            {
                PrepareMessage(message); 
            }
        } 
 
        public bool TryReceive(TimeSpan timeout, out Message message)
        { 
            try
            {
                message = this.Receive(timeout);
                return true; 
            }
            catch (TimeoutException e) 
            { 
                if (DiagnosticUtility.ShouldTraceInformation)
                { 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                }
                message = null;
                return false; 
            }
        } 
 
        protected virtual void PrepareMessage(Message message)
        { 
            message.Properties.Via = localVia;

            if (exposeConnectionProperty)
            { 
                message.Properties[ConnectionMessageProperty.Name] = this.connection;
            } 
 
            if (DiagnosticUtility.ShouldTraceInformation)
            { 
                TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageReceived,
                    MessageTransmitTraceRecord.CreateReceiveTraceRecord(message, this.LocalAddress), this, null, message);
            }
        } 

        public bool WaitForMessage(TimeSpan timeout) 
        { 
            if (DoneReceivingInCurrentState())
            { 
                return true;
            }

            bool shouldFault = true; 
            try
            { 
                bool success = messageSource.WaitForMessage(timeout); 
                shouldFault = !success; // need to fault if we've timed out because we're now toast
                return success; 
            }
            finally
            {
                if (shouldFault) 
                {
                    this.Fault(); 
                } 
            }
        } 

        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (DoneReceivingInCurrentState()) 
            {
                return new DoneReceivingAsyncResult(callback, state); 
            } 

            bool shouldFault = true; 
            try
            {
                IAsyncResult result = messageSource.BeginWaitForMessage(timeout, callback, state);
                shouldFault = false; 
                return result;
            } 
            finally 
            {
                if (shouldFault) 
                {
                    this.Fault();
                }
            } 
        }
 
        public bool EndWaitForMessage(IAsyncResult result) 
        {
            base.ThrowIfNotOpened(); // we can't be in Created or Opening 
            if (result == null)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
            } 

            DoneReceivingAsyncResult doneRecevingResult = result as DoneReceivingAsyncResult; 
            if (doneRecevingResult != null) 
            {
                return DoneReceivingAsyncResult.End(doneRecevingResult); 
            }

            bool shouldFault = true;
            try 
            {
                bool success = messageSource.EndWaitForMessage(result); 
                shouldFault = !success; // need to fault if we've timed out because we're now toast 
                return success;
            } 
            finally
            {
                if (shouldFault)
                { 
                    this.Fault();
                } 
            } 
        }
 
        ArraySegment EncodeMessage(Message message)
        {
            ArraySegment messageData = MessageEncoder.WriteMessage(message,
                int.MaxValue, bufferManager, SessionEncoder.MaxMessageFrameSize); 

            messageData = SessionEncoder.EncodeMessageFrame(messageData); 
 
            return messageData;
        } 

        // must be called under sendLock
        void ThrowIfOutputSessionClosed()
        { 
            if (isOutputSessionClosed)
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SendCannotBeCalledAfterCloseOutputSession))); 
            }
        } 

        protected override void OnSend(Message message, TimeSpan timeout)
        {
            ThrowIfDisposedOrNotOpen(); 

            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
            if (!sendLock.TryEnter(timeoutHelper.RemainingTime())) 
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 
                    new TimeoutException(SR.GetString(SR.SendToViaTimedOut, Via, timeout),
                    ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout)));
            }
 
            try
            { 
                // check again in case the previous send faulted while we were waiting for the lock 
                ThrowIfDisposedOrNotOpen();
                ThrowIfOutputSessionClosed(); 

                bool success = false;
                try
                { 
                    bool allowOutputBatching;
                    ArraySegment messageData; 
                    allowOutputBatching = message.Properties.AllowOutputBatching; 
                    messageData = EncodeMessage(message);
 
                    Connection.Write(messageData.Array, messageData.Offset, messageData.Count, !allowOutputBatching,
                        timeoutHelper.RemainingTime(), bufferManager);
                    success = true;
                } 
                finally
                { 
                    if (!success) 
                    {
                        this.Fault(); 
                    }
                }
            }
            finally 
            {
                sendLock.Exit(); 
            } 
        }
 
        protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            ThrowIfDisposedOrNotOpen();
            return new SendAsyncResult(this, message, timeout, callback, state); 
        }
 
        protected override void OnEndSend(IAsyncResult result) 
        {
            SendAsyncResult.End(result); 
        }

        class ConnectionDuplexSession : IDuplexSession
        { 
            FramingDuplexSessionChannel channel;
            string id; 
            static UriGenerator uriGenerator; 

            ConnectionDuplexSession(FramingDuplexSessionChannel channel) 
                : base()
            {
                this.channel = channel;
            } 

            public string Id 
            { 
                get
                { 
                    if (this.id == null)
                    {
                        lock (channel.ThisLock)
                        { 
                            if (this.id == null)
                            { 
                                this.id = UriGenerator.Next(); 
                            }
                        } 
                    }
                    return this.id;
                }
            } 

            static UriGenerator UriGenerator 
            { 
                get
                { 
                    if (uriGenerator == null)
                    {
                        uriGenerator = new UriGenerator();
                    } 
                    return uriGenerator;
                } 
            } 

            public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state) 
            {
                return this.BeginCloseOutputSession(channel.DefaultCloseTimeout, callback, state);
            }
 
            public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
            { 
                return channel.BeginCloseOutputSession(timeout, callback, state); 
            }
 
            public void EndCloseOutputSession(IAsyncResult result)
            {
                channel.EndCloseOutputSession(result);
            } 

            public void CloseOutputSession() 
            { 
                this.CloseOutputSession(channel.DefaultCloseTimeout);
            } 

            public void CloseOutputSession(TimeSpan timeout)
            {
                channel.CloseOutputSession(timeout); 
            }
 
            public static ConnectionDuplexSession CreateSession(FramingDuplexSessionChannel channel, 
                StreamUpgradeProvider upgrade)
            { 
                StreamSecurityUpgradeProvider security = upgrade as StreamSecurityUpgradeProvider;
                if (security == null)
                {
                    return new ConnectionDuplexSession(channel); 
                }
                else 
                { 
                    return new SecureConnectionDuplexSession(channel);
                } 
            }

            class SecureConnectionDuplexSession : ConnectionDuplexSession, ISecuritySession
            { 
                EndpointIdentity remoteIdentity;
 
                public SecureConnectionDuplexSession(FramingDuplexSessionChannel channel) 
                    : base(channel)
                { 
                    // empty
                }

                EndpointIdentity ISecuritySession.RemoteIdentity 
                {
                    get 
                    { 
                        if (remoteIdentity == null)
                        { 
                            SecurityMessageProperty security = channel.RemoteSecurity;
                            if (security != null && security.ServiceSecurityContext != null &&
                                security.ServiceSecurityContext.IdentityClaim != null &&
                                security.ServiceSecurityContext.PrimaryIdentity != null) 
                            {
                                this.remoteIdentity = EndpointIdentity.CreateIdentity( 
                                    security.ServiceSecurityContext.IdentityClaim); 
                            }
                        } 

                        return this.remoteIdentity;
                    }
                } 
            }
        } 
 
        class CloseAsyncResult : AsyncResult
        { 
            FramingDuplexSessionChannel channel;
            TimeoutHelper timeoutHelper;
            static AsyncCallback onCloseOutputSession = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCloseOutputSession));
            static AsyncCallback onCloseInputSession = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCloseInputSession)); 
            static WaitCallback onCompleteCloseScheduled;
 
            public CloseAsyncResult(FramingDuplexSessionChannel channel, TimeSpan timeout, 
                AsyncCallback callback, object state)
                : base(callback, state) 
            {
                this.channel = channel;
                this.timeoutHelper = new TimeoutHelper(timeout);
                IAsyncResult result = 
                    channel.BeginCloseOutputSession(timeoutHelper.RemainingTime(), onCloseOutputSession, this);
 
                if (!result.CompletedSynchronously) 
                {
                    return; 
                }

                if (!HandleCloseOutputSession(result, true))
                { 
                    return;
                } 
 
                base.Complete(true);
            } 

            public static void End(IAsyncResult result)
            {
                AsyncResult.End(result); 
            }
 
            bool HandleCloseOutputSession(IAsyncResult result, bool isStillSynchronous) 
            {
                channel.EndCloseOutputSession(result); 

                if (channel.isInputSessionClosed)
                {
                    return ScheduleCompleteClose(isStillSynchronous); 
                }
                else 
                { 
                    IAsyncResult closeInputSessionResult =
                        channel.messageSource.BeginReceive(timeoutHelper.RemainingTime(), onCloseInputSession, this); 

                    if (!closeInputSessionResult.CompletedSynchronously)
                    {
                        return false; 
                    }
 
                    return HandleCloseInputSession(closeInputSessionResult, isStillSynchronous); 
                }
            } 

            bool HandleCloseInputSession(IAsyncResult result, bool isStillSynchronous)
            {
                Message message = channel.messageSource.EndReceive(result); 
                if (message != null)
                { 
                    using (message) 
                    {
                        ProtocolException error = ProtocolException.ReceiveShutdownReturnedNonNull(message); 
                        throw TraceUtility.ThrowHelperError(error, message);
                    }
                }
                channel.OnInputSessionClosed(); 
                return ScheduleCompleteClose(isStillSynchronous);
            } 
 
            bool ScheduleCompleteClose(bool isStillSynchronous)
            { 
                if (isStillSynchronous)
                {
                    if (onCompleteCloseScheduled == null)
                    { 
                        onCompleteCloseScheduled = new WaitCallback(OnCompleteCloseScheduled);
                    } 
                    IOThreadScheduler.ScheduleCallback(onCompleteCloseScheduled, this); 
                    return false;
                } 
                else
                {
                    this.OnCompleteCloseScheduled();
                    return true; 
                }
            } 
 
            static void OnCloseOutputSession(IAsyncResult result)
            { 
                if (result.CompletedSynchronously)
                {
                    return;
                } 

                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState; 
                bool completeSelf = false; 
                Exception completionException = null;
                try 
                {
                    completeSelf = thisPtr.HandleCloseOutputSession(result, false);
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completeSelf = true;
                    completionException = e; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                }
            }

            static void OnCloseInputSession(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                { 
                    return;
                } 

                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
                bool completeSelf = false;
                Exception completionException = null; 
                try
                { 
                    completeSelf = thisPtr.HandleCloseInputSession(result, false); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e))
                    { 
                        throw;
                    } 
 
                    completeSelf = true;
                    completionException = e; 
                }

                if (completeSelf)
                { 
                    thisPtr.Complete(false, completionException);
                } 
            } 

            static void OnCompleteCloseScheduled(object state) 
            {
                CloseAsyncResult thisPtr = (CloseAsyncResult)state;
                Exception completionException = null;
                try 
                {
                    thisPtr.OnCompleteCloseScheduled(); 
                } 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw;
                    } 
                    completionException = e;
                } 
                thisPtr.Complete(false, completionException); 
            }
 
            void OnCompleteCloseScheduled()
            {
                channel.CompleteClose(timeoutHelper.RemainingTime());
            } 
        }
 
        class CloseOutputSessionAsyncResult : AsyncResult 
        {
            FramingDuplexSessionChannel channel; 
            TimeoutHelper timeoutHelper;
            static AsyncCallback onWriteComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteComplete));
            static WaitCallback onEnterComplete = new WaitCallback(OnEnterComplete);
 
            public CloseOutputSessionAsyncResult(FramingDuplexSessionChannel channel, TimeSpan timeout,
                AsyncCallback callback, object state) 
                : base(callback, state) 
            {
                channel.ThrowIfNotOpened(); 
                channel.ThrowIfFaulted();

                this.timeoutHelper = new TimeoutHelper(timeout);
                this.channel = channel; 

                if (!channel.sendLock.Enter(onEnterComplete, this)) 
                { 
                    return;
                } 

                bool completeSelf = false;
                bool writeSuccess = false;
 
                try
                { 
                    completeSelf = WriteEndBytes(); 
                    writeSuccess = true;
                } 
                finally
                {
                    if (!writeSuccess)
                    { 
                        Cleanup(false);
                    } 
                } 

                if (completeSelf) 
                {
                    Cleanup(true);
                    base.Complete(true);
                } 
            }
 
            bool WriteEndBytes() 
            {
                // check again in case we faulted while we were waiting for the lock 
                channel.ThrowIfFaulted();

                // we're synchronized by sendLock here
                if (channel.isOutputSessionClosed) 
                {
                    return true; 
                } 

                channel.isOutputSessionClosed = true; 

                IAsyncResult result = channel.Connection.BeginWrite(SessionEncoder.EndBytes, 0, SessionEncoder.EndBytes.Length,
                    true, timeoutHelper.RemainingTime(), onWriteComplete, this);
 
                if (!result.CompletedSynchronously)
                { 
                    return false; 
                }
 
                HandleWriteEndBytesComplete(result);
                return true;
            }
 
            void HandleWriteEndBytesComplete(IAsyncResult result)
            { 
                channel.Connection.EndWrite(result); 
                channel.OnOutputSessionClosed(ref this.timeoutHelper);
            } 

            void Cleanup(bool success)
            {
                try 
                {
                    if (!success) 
                    { 
                        channel.Fault();
                    } 
                }
                finally
                {
                    channel.sendLock.Exit(); 
                }
            } 
 
            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result);
            }

            static void OnEnterComplete(object state) 
            {
                CloseOutputSessionAsyncResult thisPtr = (CloseOutputSessionAsyncResult)state; 
                bool completeSelf = false; 
                Exception completionException = null;
                try 
                {
                    completeSelf = thisPtr.WriteEndBytes();
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completeSelf = true;
                    completionException = e; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.Cleanup(completionException == null); 
                    thisPtr.Complete(false, completionException);
                }
            }
 
            static void OnWriteComplete(IAsyncResult result)
            { 
                if (result.CompletedSynchronously) 
                {
                    return; 
                }

                CloseOutputSessionAsyncResult thisPtr = (CloseOutputSessionAsyncResult)result.AsyncState;
                Exception completionException = null; 
                try
                { 
                    thisPtr.HandleWriteEndBytesComplete(result); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e))
                    { 
                        throw;
                    } 
 
                    completionException = e;
                } 
                thisPtr.Cleanup(completionException == null);
                thisPtr.Complete(false, completionException);
            }
        } 

        class SendAsyncResult : TraceAsyncResult 
        { 
            FramingDuplexSessionChannel channel;
            Message message; 
            byte[] buffer;
            TimeoutHelper timeoutHelper;
            static AsyncCallback onWriteComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteComplete));
            static WaitCallback onEnterComplete = new WaitCallback(OnEnterComplete); 

            public SendAsyncResult(FramingDuplexSessionChannel channel, Message message, TimeSpan timeout, 
                AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.channel = channel;
                this.message = message;
 
                if (!channel.sendLock.Enter(onEnterComplete, this))
                    return; 
 
                bool completeSelf = false;
                bool writeSuccess = false; 

                try
                {
                    completeSelf = WriteCore(); 
                    writeSuccess = true;
                } 
                finally 
                {
                    if (!writeSuccess) 
                    {
                        Cleanup(false);
                    }
                } 

                if (completeSelf) 
                { 
                    Cleanup(true);
                    base.Complete(true); 
                }
            }

            bool WriteCore() 
            {
                // check again in case the previous send faulted while we were waiting for the lock 
                channel.ThrowIfDisposedOrNotOpen(); 
                channel.ThrowIfOutputSessionClosed();
 
                bool allowOutputBatching;
                ArraySegment messageData;
                allowOutputBatching = message.Properties.AllowOutputBatching;
                messageData = channel.EncodeMessage(message); 

                message = null; 
                buffer = messageData.Array; 
                IAsyncResult asyncResult = channel.Connection.BeginWrite(messageData.Array, messageData.Offset, messageData.Count,
                    !allowOutputBatching, timeoutHelper.RemainingTime(), onWriteComplete, this); 

                if (!asyncResult.CompletedSynchronously)
                {
                    return false; 
                }
 
                channel.Connection.EndWrite(asyncResult); 
                return true;
            } 

            void Cleanup(bool success)
            {
                try 
                {
                    if (!success) 
                    { 
                        channel.Fault();
                    } 
                }
                finally
                {
                    channel.sendLock.Exit(); 
                }
 
                if (buffer != null) 
                {
                    channel.bufferManager.ReturnBuffer(buffer); 
                    buffer = null;
                }
            }
 
            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result); 
            }
 
            static void OnEnterComplete(object state)
            {
                SendAsyncResult thisPtr = (SendAsyncResult)state;
                bool completeSelf = false; 
                Exception completionException = null;
                try 
                { 
                    completeSelf = thisPtr.WriteCore();
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    } 

                    completeSelf = true; 
                    completionException = e;
                }

                if (completeSelf) 
                {
                    thisPtr.Cleanup(completionException == null); 
                    thisPtr.Complete(false, completionException); 
                }
            } 

            static void OnWriteComplete(IAsyncResult result)
            {
                if (result.CompletedSynchronously) 
                    return;
 
                SendAsyncResult thisPtr = (SendAsyncResult)result.AsyncState; 
                Exception completionException = null;
                try 
                {
                    thisPtr.channel.Connection.EndWrite(result);
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completionException = e;
                } 
                thisPtr.Cleanup(completionException == null);
                thisPtr.Complete(false, completionException); 
            } 
        }
 
        class TryReceiveAsyncResult : AsyncResult
        {
            FramingDuplexSessionChannel channel;
            static AsyncCallback onReceive = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceive)); 
            bool receiveSuccess;
            Message message; 
 
            public TryReceiveAsyncResult(FramingDuplexSessionChannel channel, TimeSpan timeout,
                AsyncCallback callback, object state) 
                : base(callback, state)
            {
                this.channel = channel;
 
                bool completeSelf = false;
                try 
                { 
                    IAsyncResult result = this.channel.BeginReceive(timeout, onReceive, this);
                    if (result.CompletedSynchronously) 
                    {
                        CompleteReceive(result);
                        completeSelf = true;
                    } 
                }
                catch (TimeoutException e) 
                { 
                    if (DiagnosticUtility.ShouldTraceInformation)
                    { 
                        DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                    }

                    completeSelf = true; 
                }
 
                if (completeSelf) 
                {
                    base.Complete(true); 
                }
            }

            public static bool End(IAsyncResult result, out Message message) 
            {
                TryReceiveAsyncResult thisPtr = AsyncResult.End(result); 
                message = thisPtr.message; 
                return thisPtr.receiveSuccess;
            } 

            void CompleteReceive(IAsyncResult result)
            {
                this.message = this.channel.EndReceive(result); 
                this.receiveSuccess = true;
            } 
 
            static void OnReceive(IAsyncResult result)
            { 
                if (result.CompletedSynchronously)
                {
                    return;
                } 

                TryReceiveAsyncResult thisPtr = (TryReceiveAsyncResult)result.AsyncState; 
                Exception completionException = null; 
                try
                { 
                    thisPtr.CompleteReceive(result);
                }
                catch (TimeoutException e)
                { 
                    if (DiagnosticUtility.ShouldTraceInformation)
                    { 
                        DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); 
                    }
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    } 

                    completionException = e; 
                }

                thisPtr.Complete(false, completionException);
            } 
        }
    } 
 
    class ClientFramingDuplexSessionChannel : FramingDuplexSessionChannel
    { 
        IConnectionOrientedTransportChannelFactorySettings settings;
        ClientDuplexDecoder decoder;
        StreamUpgradeProvider upgrade;
        ConnectionPoolHelper connectionPoolHelper; 

        public ClientFramingDuplexSessionChannel(ChannelManagerBase factory, IConnectionOrientedTransportChannelFactorySettings settings, 
            EndpointAddress remoteAddresss, Uri via, IConnectionInitiator connectionInitiator, ConnectionPool connectionPool, 
            bool exposeConnectionProperty)
            : base(factory, settings, remoteAddresss, via, exposeConnectionProperty) 
        {
            this.settings = settings;
            this.MessageEncoder = settings.MessageEncoderFactory.CreateSessionEncoder();
            this.upgrade = settings.Upgrade; 
            this.connectionPoolHelper = new DuplexConnectionPoolHelper(this, connectionPool, connectionInitiator);
        } 
 
        ArraySegment CreatePreamble()
        { 
            EncodedVia encodedVia = new EncodedVia(this.Via.AbsoluteUri);
            EncodedContentType encodedContentType = EncodedContentType.Create(this.MessageEncoder.ContentType);

            // calculate preamble length 
            int startSize = ClientDuplexEncoder.ModeBytes.Length + SessionEncoder.CalcStartSize(encodedVia, encodedContentType);
            int preambleEndOffset = 0; 
            if (this.upgrade == null) 
            {
                preambleEndOffset = startSize; 
                startSize += ClientDuplexEncoder.PreambleEndBytes.Length;
            }

            byte[] startBytes = DiagnosticUtility.Utility.AllocateByteArray(startSize); 
            Buffer.BlockCopy(ClientDuplexEncoder.ModeBytes, 0, startBytes, 0, ClientDuplexEncoder.ModeBytes.Length);
            SessionEncoder.EncodeStart(startBytes, ClientDuplexEncoder.ModeBytes.Length, encodedVia, encodedContentType); 
            if (preambleEndOffset > 0) 
            {
                Buffer.BlockCopy(ClientDuplexEncoder.PreambleEndBytes, 0, startBytes, preambleEndOffset, ClientDuplexEncoder.PreambleEndBytes.Length); 
            }

            return new ArraySegment(startBytes, 0, startSize);
        } 

        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            return new OpenAsyncResult(this, timeout, callback, state);
        } 

        protected override void OnEndOpen(IAsyncResult result)
        {
            OpenAsyncResult.End(result); 
        }
 
        IConnection SendPreamble(IConnection connection, ArraySegment preamble, ref TimeoutHelper timeoutHelper) 
        {
            // initialize a new decoder 
            this.decoder = new ClientDuplexDecoder(0);
            byte[] ackBuffer = new byte[1];
            connection.Write(preamble.Array, preamble.Offset, preamble.Count, true, timeoutHelper.RemainingTime());
 
            if (this.upgrade != null)
            { 
                StreamUpgradeInitiator upgradeInitiator = upgrade.CreateUpgradeInitiator(this.RemoteAddress, this.Via); 

                upgradeInitiator.Open(timeoutHelper.RemainingTime()); 
                if (!ConnectionUpgradeHelper.InitiateUpgrade(upgradeInitiator, ref connection, this.decoder,
                    this, ref timeoutHelper))
                {
                    ConnectionUpgradeHelper.DecodeFramingFault(this.decoder, connection, Via, MessageEncoder.ContentType, ref timeoutHelper); 
                }
 
                SetRemoteSecurity(upgradeInitiator); 
                upgradeInitiator.Close(timeoutHelper.RemainingTime());
                connection.Write(ClientDuplexEncoder.PreambleEndBytes, 0, 
                    ClientDuplexEncoder.PreambleEndBytes.Length, true, timeoutHelper.RemainingTime());
            }

            // read ACK 
            int ackBytesRead = connection.Read(ackBuffer, 0, ackBuffer.Length, timeoutHelper.RemainingTime());
            if (!ConnectionUpgradeHelper.ValidatePreambleResponse(ackBuffer, ackBytesRead, this.decoder, Via)) 
            { 
                ConnectionUpgradeHelper.DecodeFramingFault(this.decoder, connection, Via,
                    MessageEncoder.ContentType, ref timeoutHelper); 
            }

            return connection;
        } 

        IAsyncResult BeginSendPreamble(IConnection connection, ArraySegment preamble, ref TimeoutHelper timeoutHelper, 
            AsyncCallback callback, object state) 
        {
            return new SendPreambleAsyncResult(this, connection, preamble, ref timeoutHelper, callback, state); 
        }

        IConnection EndSendPreamble(IAsyncResult result)
        { 
            return SendPreambleAsyncResult.End(result);
        } 
 
        protected override void OnOpen(TimeSpan timeout)
        { 
            IConnection connection;
            try
            {
                connection = connectionPoolHelper.EstablishConnection(timeout); 
            }
            catch (TimeoutException exception) 
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                    new TimeoutException(SR.GetString(SR.TimeoutOnOpen, timeout), exception)); 
            }

            bool connectionAccepted = false;
            try 
            {
                AcceptConnection(connection); 
                connectionAccepted = true; 
            }
            finally 
            {
                if (!connectionAccepted)
                {
                    this.connectionPoolHelper.Abort(); 
                }
            } 
        } 

        protected override void ReturnConnectionIfNecessary(bool abort, TimeSpan timeout) 
        {
            lock (ThisLock)
            {
                if (abort) 
                {
                    this.connectionPoolHelper.Abort(); 
                } 
                else
                { 
                    this.connectionPoolHelper.Close(timeout);
                }
            }
        } 

        void AcceptConnection(IConnection connection) 
        { 
            base.SetMessageSource(new ClientDuplexConnectionReader(this, connection, decoder, this.settings, MessageEncoder));
 
            lock (ThisLock)
            {
                if (this.State != CommunicationState.Opening)
                { 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new CommunicationObjectAbortedException(SR.GetString(SR.DuplexChannelAbortedDuringOpen, this.Via))); 
                } 

                this.Connection = connection; 
            }
        }

        void SetRemoteSecurity(StreamUpgradeInitiator upgradeInitiator) 
        {
            this.RemoteSecurity = StreamSecurityUpgradeInitiator.GetRemoteSecurity(upgradeInitiator); 
        } 

        protected override void PrepareMessage(Message message) 
        {
            base.PrepareMessage(message);

            if (this.RemoteSecurity != null) 
            {
                message.Properties.Security = (SecurityMessageProperty)this.RemoteSecurity.CreateCopy(); 
            } 
        }
 
        class DuplexConnectionPoolHelper : ConnectionPoolHelper
        {
            ClientFramingDuplexSessionChannel channel;
            ArraySegment preamble; 

            public DuplexConnectionPoolHelper(ClientFramingDuplexSessionChannel channel, 
                ConnectionPool connectionPool, IConnectionInitiator connectionInitiator) 
                : base(connectionPool, connectionInitiator, channel.Via)
            { 
                this.channel = channel;
                this.preamble = channel.CreatePreamble();
            }
 
            protected override TimeoutException CreateNewConnectionTimeoutException(TimeSpan timeout, TimeoutException innerException)
            { 
                return new TimeoutException(SR.GetString(SR.OpenTimedOutEstablishingTransportSession, 
                        timeout, channel.Via.AbsoluteUri), innerException);
            } 

            protected override IAsyncResult BeginAcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state)
            {
                return channel.BeginSendPreamble(connection, preamble, ref timeoutHelper, callback, state); 
            }
 
            protected override IConnection EndAcceptPooledConnection(IAsyncResult result) 
            {
                return channel.EndSendPreamble(result); 
            }

            protected override IConnection AcceptPooledConnection(IConnection connection, ref TimeoutHelper timeoutHelper)
            { 
                return channel.SendPreamble(connection, preamble, ref timeoutHelper);
            } 
        } 

        class SendPreambleAsyncResult : AsyncResult 
        {
            ClientFramingDuplexSessionChannel channel;
            IConnection connection;
            TimeoutHelper timeoutHelper; 
            StreamUpgradeInitiator upgradeInitiator;
            static WaitCallback onReadPreambleAck = new WaitCallback(OnReadPreambleAck); 
            static AsyncCallback onWritePreamble = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWritePreamble)); 
            static AsyncCallback onWritePreambleEnd;
            static AsyncCallback onUpgrade; 
            static AsyncCallback onUpgradeInitiatorOpen;
            static AsyncCallback onUpgradeInitiatorClose;

            public SendPreambleAsyncResult(ClientFramingDuplexSessionChannel channel, 
                IConnection connection, ArraySegment preamble,
                ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state) 
                : base(callback, state) 
            {
                this.channel = channel; 
                this.timeoutHelper = timeoutHelper;
                this.connection = connection;

                // initialize a new decoder 
                channel.decoder = new ClientDuplexDecoder(0);
                IAsyncResult writePreambleResult = connection.BeginWrite( 
                    preamble.Array, preamble.Offset, preamble.Count, 
                        true, timeoutHelper.RemainingTime(), onWritePreamble, this);
 
                if (!writePreambleResult.CompletedSynchronously)
                {
                    return;
                } 

                if (HandleWritePreamble(writePreambleResult)) 
                { 
                    base.Complete(true);
                } 
            }

            public static IConnection End(IAsyncResult result)
            { 
                SendPreambleAsyncResult thisPtr = AsyncResult.End(result);
                return thisPtr.connection; 
            } 

            bool HandleWritePreamble(IAsyncResult result) 
            {
                this.connection.EndWrite(result);

                // now upgrade if necessary 
                if (channel.upgrade != null)
                { 
                    this.upgradeInitiator = channel.upgrade.CreateUpgradeInitiator(channel.RemoteAddress, channel.Via); 
                    if (onUpgradeInitiatorOpen == null)
                    { 
                        onUpgradeInitiatorOpen = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnUpgradeInitiatorOpen));
                    }

                    IAsyncResult initiatorOpenResult = 
                        this.upgradeInitiator.BeginOpen(timeoutHelper.RemainingTime(), onUpgradeInitiatorOpen, this);
 
                    if (!initiatorOpenResult.CompletedSynchronously) 
                    {
                        return false; 
                    }

                    return HandleInitiatorOpen(initiatorOpenResult);
                } 
                else
                { 
                    return ReadAck(); 
                }
            } 

            bool HandleInitiatorOpen(IAsyncResult result)
            {
                this.upgradeInitiator.EndOpen(result); 
                if (onUpgrade == null)
                { 
                    onUpgrade = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnUpgrade)); 
                }
 
                IAsyncResult upgradeResult = ConnectionUpgradeHelper.BeginInitiateUpgrade(
                    channel, channel.RemoteAddress, this.connection, channel.decoder, this.upgradeInitiator,
                    channel.MessageEncoder.ContentType, timeoutHelper, onUpgrade, this);
 
                if (!upgradeResult.CompletedSynchronously)
                { 
                    return false; 
                }
 
                return HandleUpgrade(upgradeResult);
            }

            // finish our upgrade and read ack 
            bool HandleUpgrade(IAsyncResult result)
            { 
                this.connection = ConnectionUpgradeHelper.EndInitiateUpgrade(result); 
                channel.SetRemoteSecurity(this.upgradeInitiator);
 
                if (onUpgradeInitiatorClose == null)
                {
                    onUpgradeInitiatorClose = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnUpgradeInitiatorClose));
                } 

                IAsyncResult initiatorCloseResult = 
                    this.upgradeInitiator.BeginClose(timeoutHelper.RemainingTime(), onUpgradeInitiatorClose, this); 

                if (!initiatorCloseResult.CompletedSynchronously) 
                {
                    return false;
                }
 
                return HandleInitiatorClose(initiatorCloseResult);
            } 
 
            bool HandleInitiatorClose(IAsyncResult result)
            { 
                this.upgradeInitiator.EndClose(result);
                this.upgradeInitiator = null; // we're done with the upgrade

                // in the upgrade case, preamble end bytes aren't sent with the initial bytes; we need to send them here 
                if (onWritePreambleEnd == null)
                { 
                    onWritePreambleEnd = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWritePreambleEnd)); 
                }
 
                IAsyncResult writePreambleResult = this.connection.BeginWrite(
                    ClientDuplexEncoder.PreambleEndBytes, 0, ClientDuplexEncoder.PreambleEndBytes.Length, true,
                    timeoutHelper.RemainingTime(), onWritePreambleEnd, this);
 
                if (!writePreambleResult.CompletedSynchronously)
                { 
                    return false; 
                }
 
                this.connection.EndWrite(writePreambleResult);
                return ReadAck();
            }
 
            bool ReadAck()
            { 
                AsyncReadResult readAckResult = this.connection.BeginRead(0, 1, 
                    timeoutHelper.RemainingTime(), onReadPreambleAck, this);
 
                if (readAckResult == AsyncReadResult.Queued)
                {
                    return false;
                } 

                return HandlePreambleAck(); 
            } 

            bool HandlePreambleAck() 
            {
                int ackBytesRead = connection.EndRead();

                // it's possible to get a fault instead of an ack 
                if (!ConnectionUpgradeHelper.ValidatePreambleResponse(
                    connection.AsyncReadBuffer, ackBytesRead, channel.decoder, channel.Via)) 
                { 
                    IAsyncResult decodeFaultResult = ConnectionUpgradeHelper.BeginDecodeFramingFault(channel.decoder,
                        connection, channel.Via, channel.MessageEncoder.ContentType, ref timeoutHelper, 
                        DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnFailedPreamble)), this);

                    if (!decodeFaultResult.CompletedSynchronously)
                    { 
                        return false;
                    } 
 
                    ConnectionUpgradeHelper.EndDecodeFramingFault(decodeFaultResult);
                    return true; 
                }

                return true;
            } 

            static void OnWritePreamble(IAsyncResult result) 
            { 
                if (result.CompletedSynchronously)
                { 
                    return;
                }

                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState; 

                Exception completionException = null; 
                bool completeSelf = false; 
                try
                { 
                    completeSelf = thisPtr.HandleWritePreamble(result);
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    { 
                        throw;
                    } 

                    completeSelf = true;
                    completionException = e;
                } 

                if (completeSelf) 
                { 
                    thisPtr.Complete(false, completionException);
                } 
            }

            static void OnReadPreambleAck(object state)
            { 
                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)state;
 
                Exception completionException = null; 
                bool completeSelf;
                try 
                {
                    completeSelf = thisPtr.HandlePreambleAck();
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completeSelf = true;
                    completionException = e; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                }
            }

            static void OnUpgradeInitiatorOpen(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                { 
                    return;
                } 

                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState;

                bool completeSelf = false; 
                Exception completionException = null;
                try 
                { 
                    completeSelf = thisPtr.HandleInitiatorOpen(result);
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    } 

                    completeSelf = true; 
                    completionException = e;
                }

                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                } 
            }
 
            static void OnUpgrade(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                { 
                    return;
                } 
 
                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState;
 
                bool completeSelf = false;
                Exception completionException = null;
                try
                { 
                    completeSelf = thisPtr.HandleUpgrade(result);
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw;
                    } 

                    completeSelf = true; 
                    completionException = e; 
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                } 
            }
 
            static void OnUpgradeInitiatorClose(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                {
                    return;
                }
 
                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState;
 
                bool completeSelf = false; 
                Exception completionException = null;
                try 
                {
                    completeSelf = thisPtr.HandleInitiatorClose(result);
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completeSelf = true;
                    completionException = e; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                }
            }

            static void OnWritePreambleEnd(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                    return; 

                SendPreambleAsyncResult thisPtr = (SendPreambleAsyncResult)result.AsyncState; 
                Exception completionException = null;
                bool completeSelf = false;
                try
                { 
                    thisPtr.connection.EndWrite(result);
                    completeSelf = thisPtr.ReadAck(); 
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw; 
                    }
 
                    completeSelf = true; 
                    completionException = e;
                } 

                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException); 
                }
            } 
 
            void OnFailedPreamble(IAsyncResult result)
            { 
                if (result.CompletedSynchronously)
                {
                    return;
                } 

                Exception completionException = null; 
                try 
                {
                    ConnectionUpgradeHelper.EndDecodeFramingFault(result); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                    { 
                        throw; 
                    }
 
                    completionException = e;
                }

                base.Complete(false, completionException); 
            }
        } 
 
        class OpenAsyncResult : AsyncResult
        { 
            static AsyncCallback onEstablishConnection = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnEstablishConnection));
            ClientFramingDuplexSessionChannel duplexChannel;
            TimeoutHelper timeoutHelper;
 
            public OpenAsyncResult(ClientFramingDuplexSessionChannel duplexChannel, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            { 
                this.timeoutHelper = new TimeoutHelper(timeout);
                this.duplexChannel = duplexChannel; 

                IAsyncResult result;
                try
                { 
                    result = duplexChannel.connectionPoolHelper.BeginEstablishConnection(
                        timeoutHelper.RemainingTime(), onEstablishConnection, this); 
                } 
                catch (TimeoutException exception)
                { 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new TimeoutException(SR.GetString(SR.TimeoutOnOpen, timeout), exception));
                }
 
                if (!result.CompletedSynchronously)
                { 
                    return; 
                }
 
                if (HandleEstablishConnection(result))
                {
                    base.Complete(true);
                } 
            }
 
            public static void End(IAsyncResult result) 
            {
                AsyncResult.End(result); 
            }

            bool HandleEstablishConnection(IAsyncResult result)
            { 
                IConnection connection;
                try 
                { 
                    connection = duplexChannel.connectionPoolHelper.EndEstablishConnection(result);
                } 
                catch (TimeoutException exception)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new TimeoutException(SR.GetString(SR.TimeoutOnOpen, this.timeoutHelper.OriginalTimeout), exception)); 
                }
 
                duplexChannel.AcceptConnection(connection); 
                return true;
            } 

            static void OnEstablishConnection(IAsyncResult result)
            {
                if (result.CompletedSynchronously) 
                {
                    return; 
                } 

                OpenAsyncResult thisPtr = (OpenAsyncResult)result.AsyncState; 

                Exception completionException = null;
                bool completeSelf;
                try 
                {
                    completeSelf = thisPtr.HandleEstablishConnection(result); 
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw; 
                    }
 
                    completeSelf = true; 
                    completionException = e;
                } 

                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException); 
                }
            } 
        } 
    }
 
    // used by StreamedFramingRequestChannel and ClientFramingDuplexSessionChannel
    class ConnectionUpgradeHelper
    {
        public static IAsyncResult BeginDecodeFramingFault(ClientFramingDecoder decoder, IConnection connection, 
            Uri via, string contentType, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state)
        { 
            return new DecodeFailedUpgradeAsyncResult(decoder, connection, via, contentType, ref timeoutHelper, 
                callback, state);
        } 

        public static void EndDecodeFramingFault(IAsyncResult result)
        {
            DecodeFailedUpgradeAsyncResult.End(result); 
        }
 
        public static void DecodeFramingFault(ClientFramingDecoder decoder, IConnection connection, 
            Uri via, string contentType, ref TimeoutHelper timeoutHelper)
        { 
            ValidateReadingFaultString(decoder);

            int offset = 0;
            byte[] faultBuffer = DiagnosticUtility.Utility.AllocateByteArray(FaultStringDecoder.FaultSizeQuota); 
            int size = connection.Read(faultBuffer, offset, faultBuffer.Length, timeoutHelper.RemainingTime());
 
            while (size > 0) 
            {
                int bytesDecoded = decoder.Decode(faultBuffer, offset, size); 
                offset += bytesDecoded;
                size -= bytesDecoded;

                if (decoder.CurrentState == ClientFramingDecoderState.Fault) 
                {
                    ConnectionUtilities.CloseNoThrow(connection, timeoutHelper.RemainingTime()); 
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 
                        FaultStringDecoder.GetFaultException(decoder.Fault, via.ToString(), contentType));
                } 
                else
                {
                    if (decoder.CurrentState != ClientFramingDecoderState.ReadingFaultString)
                    { 
                        DiagnosticUtility.DebugAssert("invalid framing client state machine");
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); 
                    } 
                    if (size == 0)
                    { 
                        offset = 0;
                        size = connection.Read(faultBuffer, offset, faultBuffer.Length, timeoutHelper.RemainingTime());
                    }
                } 
            }
 
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException()); 
        }
 
        public static IAsyncResult BeginInitiateUpgrade(IDefaultCommunicationTimeouts timeouts, EndpointAddress remoteAddress,
            IConnection connection, ClientFramingDecoder decoder,
            StreamUpgradeInitiator upgradeInitiator, string contentType, TimeoutHelper timeoutHelper,
            AsyncCallback callback, object state) 
        {
            return new InitiateUpgradeAsyncResult(timeouts, remoteAddress, connection, decoder, upgradeInitiator, contentType, timeoutHelper, 
                callback, state); 
        }
 
        public static IConnection EndInitiateUpgrade(IAsyncResult result)
        {
            return InitiateUpgradeAsyncResult.End(result);
        } 

        public static bool InitiateUpgrade(StreamUpgradeInitiator upgradeInitiator, ref IConnection connection, 
            ClientFramingDecoder decoder, IDefaultCommunicationTimeouts defaultTimeouts, ref TimeoutHelper timeoutHelper) 
        {
            string upgradeContentType = upgradeInitiator.GetNextUpgrade(); 

            while (upgradeContentType != null)
            {
                EncodedUpgrade encodedUpgrade = new EncodedUpgrade(upgradeContentType); 
                // write upgrade request framing for synchronization
                connection.Write(encodedUpgrade.EncodedBytes, 0, encodedUpgrade.EncodedBytes.Length, true, timeoutHelper.RemainingTime()); 
                byte[] buffer = new byte[1]; 

                // read upgrade response framing 
                int size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());

                if (!ValidateUpgradeResponse(buffer, size, decoder)) // we have a problem
                { 
                    return false;
                } 
 
                // initiate wire upgrade
                ConnectionStream connectionStream = new ConnectionStream(connection, defaultTimeouts); 
                Stream upgradedStream = upgradeInitiator.InitiateUpgrade(connectionStream);

                // and re-wrap connection
                connection = new StreamConnection(upgradedStream, connectionStream); 

                upgradeContentType = upgradeInitiator.GetNextUpgrade(); 
            } 

            return true; 
        }

        static void ValidateReadingFaultString(ClientFramingDecoder decoder)
        { 
            if (decoder.CurrentState != ClientFramingDecoderState.ReadingFaultString)
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new System.ServiceModel.Security.MessageSecurityException( 
                    SR.GetString(SR.ServerRejectedUpgradeRequest)));
            } 
        }

        public static bool ValidatePreambleResponse(byte[] buffer, int count, ClientFramingDecoder decoder, Uri via)
        { 
            if (count == 0)
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( 
                    new ProtocolException(SR.GetString(SR.ServerRejectedSessionPreamble, via),
                    decoder.CreatePrematureEOFException())); 
            }

            // decode until the framing byte has been processed (it always will be)
            while (decoder.Decode(buffer, 0, count) == 0) 
            {
                // do nothing 
            } 

            if (decoder.CurrentState != ClientFramingDecoderState.Start) // we have a problem 
            {
                return false;
            }
 
            return true;
        } 
 
        static bool ValidateUpgradeResponse(byte[] buffer, int count, ClientFramingDecoder decoder)
        { 
            if (count == 0)
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new MessageSecurityException(SR.GetString(SR.ServerRejectedUpgradeRequest), decoder.CreatePrematureEOFException()));
            } 

            // decode until the framing byte has been processed (it always will be) 
            while (decoder.Decode(buffer, 0, count) == 0) 
            {
                // do nothing 
            }

            if (decoder.CurrentState != ClientFramingDecoderState.UpgradeResponse) // we have a problem
            { 
                return false;
            } 
 
            return true;
        } 

        class DecodeFailedUpgradeAsyncResult : AsyncResult
        {
            ClientFramingDecoder decoder; 
            IConnection connection;
            Uri via; 
            string contentType; 
            TimeoutHelper timeoutHelper;
            static WaitCallback onReadFaultData = new WaitCallback(OnReadFaultData); 

            public DecodeFailedUpgradeAsyncResult(ClientFramingDecoder decoder, IConnection connection,
                Uri via, string contentType, ref TimeoutHelper timeoutHelper,
                AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                ValidateReadingFaultString(decoder); 

                this.decoder = decoder; 
                this.connection = connection;
                this.via = via;
                this.contentType = contentType;
                this.timeoutHelper = timeoutHelper; 

                if (connection.BeginRead(0, Math.Min(FaultStringDecoder.FaultSizeQuota, connection.AsyncReadBufferSize), 
                    timeoutHelper.RemainingTime(), onReadFaultData, this) == AsyncReadResult.Queued) 
                {
                    return; 
                }

                CompleteReadFaultData();
            } 

            void CompleteReadFaultData() 
            { 
                int offset = 0;
                int size = connection.EndRead(); 

                while (size > 0)
                {
                    int bytesDecoded = decoder.Decode(connection.AsyncReadBuffer, offset, size); 
                    offset += bytesDecoded;
                    size -= bytesDecoded; 
 
                    if (decoder.CurrentState == ClientFramingDecoderState.Fault)
                    { 
                        ConnectionUtilities.CloseNoThrow(connection, timeoutHelper.RemainingTime());
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                            FaultStringDecoder.GetFaultException(decoder.Fault, via.ToString(), contentType));
                    } 
                    else
                    { 
                        if (decoder.CurrentState != ClientFramingDecoderState.ReadingFaultString) 
                        {
                            DiagnosticUtility.DebugAssert("invalid framing client state machine"); 
                            throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false);
                        }
                        if (size == 0)
                        { 
                            offset = 0;
                            if (connection.BeginRead(0, Math.Min(FaultStringDecoder.FaultSizeQuota, connection.AsyncReadBufferSize), 
                                timeoutHelper.RemainingTime(), onReadFaultData, this) == AsyncReadResult.Queued) 
                            {
                                return; 
                            }

                            size = connection.EndRead();
                        } 
                    }
                } 
 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
            } 

            public static void End(IAsyncResult result)
            {
                AsyncResult.End(result); 
            }
 
            static void OnReadFaultData(object state) 
            {
                DecodeFailedUpgradeAsyncResult thisPtr = (DecodeFailedUpgradeAsyncResult)state; 

                // This AsyncResult only completes with an exception.
                Exception completionException = null;
                try 
                {
                    thisPtr.CompleteReadFaultData(); 
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw; 
                    }
 
                    completionException = e; 
                }
                if (completionException != null) 
                {
                    thisPtr.Complete(false, completionException);
                }
            } 
        }
 
        class InitiateUpgradeAsyncResult : AsyncResult 
        {
            IDefaultCommunicationTimeouts defaultTimeouts; 
            IConnection connection;
            ConnectionStream connectionStream;
            string contentType;
            ClientFramingDecoder decoder; 
            static AsyncCallback onInitiateUpgrade = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnInitiateUpgrade));
            static WaitCallback onReadUpgradeResponse = new WaitCallback(OnReadUpgradeResponse); 
            static AsyncCallback onFailedUpgrade; 
            static AsyncCallback onWriteUpgradeBytes = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteUpgradeBytes));
            EndpointAddress remoteAddress; 
            StreamUpgradeInitiator upgradeInitiator;
            TimeoutHelper timeoutHelper;

            public InitiateUpgradeAsyncResult(IDefaultCommunicationTimeouts timeouts, EndpointAddress remoteAddress, 
                IConnection connection,
                ClientFramingDecoder decoder, StreamUpgradeInitiator upgradeInitiator, 
                string contentType, TimeoutHelper timeoutHelper, 
                AsyncCallback callback, object state)
                : base(callback, state) 
            {
                this.defaultTimeouts = timeouts;
                this.decoder = decoder;
                this.upgradeInitiator = upgradeInitiator; 
                this.contentType = contentType;
                this.timeoutHelper = timeoutHelper; 
                this.connection = connection; 
                this.remoteAddress = remoteAddress;
                if (Begin()) 
                {
                    base.Complete(true);
                }
            } 

            bool Begin() 
            { 
                string upgradeContentType = upgradeInitiator.GetNextUpgrade();
 
                while (upgradeContentType != null)
                {
                    EncodedUpgrade encodedUpgrade = new EncodedUpgrade(upgradeContentType);
                    IAsyncResult writeFrameResult = connection.BeginWrite( 
                        encodedUpgrade.EncodedBytes, 0, encodedUpgrade.EncodedBytes.Length, true, timeoutHelper.RemainingTime(),
                        onWriteUpgradeBytes, this); 
                    if (!writeFrameResult.CompletedSynchronously) 
                    {
                        return false; 
                    }

                    if (!CompleteWriteUpgradeBytes(writeFrameResult))
                    { 
                        return false;
                    } 
 
                    upgradeContentType = upgradeInitiator.GetNextUpgrade();
                } 

                return true;
            }
 
            bool CompleteWriteUpgradeBytes(IAsyncResult result)
            { 
                connection.EndWrite(result); 

                if (connection.BeginRead(0, ServerSessionEncoder.UpgradeResponseBytes.Length, timeoutHelper.RemainingTime(), 
                    onReadUpgradeResponse, this) == AsyncReadResult.Queued)
                {
                    return false;
                } 

                return CompleteReadUpgradeResponse(); 
            } 

            bool CompleteReadUpgradeResponse() 
            {
                int bytesRead = connection.EndRead();

                if (!ConnectionUpgradeHelper.ValidateUpgradeResponse(connection.AsyncReadBuffer, bytesRead, decoder)) 
                {
                    if (onFailedUpgrade == null) 
                    { 
                        onFailedUpgrade = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnFailedUpgrade));
                    } 

                    IAsyncResult result = ConnectionUpgradeHelper.BeginDecodeFramingFault(decoder, connection,
                        remoteAddress.Uri, contentType, ref timeoutHelper, onFailedUpgrade, this);
 
                    if (result.CompletedSynchronously)
                    { 
                        ConnectionUpgradeHelper.EndDecodeFramingFault(result); 
                    }
 
                    return result.CompletedSynchronously;
                }

                this.connectionStream = new ConnectionStream(connection, this.defaultTimeouts); 
                IAsyncResult initiateUpgradeResult = upgradeInitiator.BeginInitiateUpgrade(connectionStream, onInitiateUpgrade, this);
                if (!initiateUpgradeResult.CompletedSynchronously) 
                    return false; 

                CompleteUpgrade(initiateUpgradeResult); 
                return true;
            }

            void CompleteUpgrade(IAsyncResult result) 
            {
                Stream stream = upgradeInitiator.EndInitiateUpgrade(result); 
                this.connection = new StreamConnection(stream, connectionStream); 
            }
 
            public static IConnection End(IAsyncResult result)
            {
                InitiateUpgradeAsyncResult thisPtr = AsyncResult.End(result);
                return thisPtr.connection; 
            }
 
            static void OnReadUpgradeResponse(object state) 
            {
                InitiateUpgradeAsyncResult thisPtr = (InitiateUpgradeAsyncResult)state; 

                Exception completionException = null;
                bool completeSelf = false;
                try 
                {
                    if (thisPtr.CompleteReadUpgradeResponse()) 
                    { 
                        completeSelf = thisPtr.Begin();
                    } 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                    { 
                        throw; 
                    }
 
                    completeSelf = true;
                    completionException = e;
                }
                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                } 
            }
 
            static void OnFailedUpgrade(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                { 
                    return;
                } 
 
                InitiateUpgradeAsyncResult thisPtr = (InitiateUpgradeAsyncResult)result.AsyncState;
 
                Exception completionException = null;
                try
                {
                    ConnectionUpgradeHelper.EndDecodeFramingFault(result); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw;
                    }
 
                    completionException = e;
                } 
                thisPtr.Complete(false, completionException); 
            }
 
            static void OnWriteUpgradeBytes(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                    return; 

                InitiateUpgradeAsyncResult thisPtr = (InitiateUpgradeAsyncResult)result.AsyncState; 
 
                Exception completionException = null;
                bool completeSelf = false; 
                try
                {
                    if (thisPtr.CompleteWriteUpgradeBytes(result))
                    { 
                        completeSelf = thisPtr.Begin();
                    } 
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e))
                    {
                        throw; 
                    }
 
                    completeSelf = true; 
                    completionException = e;
                } 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                } 
            }
 
            static void OnInitiateUpgrade(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                    return;

                InitiateUpgradeAsyncResult thisPtr = (InitiateUpgradeAsyncResult)result.AsyncState;
 
                Exception completionException = null;
                bool completeSelf; 
                try 
                {
                    thisPtr.CompleteUpgrade(result); 
                    completeSelf = thisPtr.Begin();
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    { 
                        throw;
                    } 

                    completeSelf = true;
                    completionException = e;
                } 
                if (completeSelf)
                { 
                    thisPtr.Complete(false, completionException); 
                }
            } 
        }
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK