SingletonConnectionReader.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / SingletonConnectionReader.cs / 1 / SingletonConnectionReader.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------

namespace System.ServiceModel.Channels 
{
    using System; 
    using System.ServiceModel; 
    using System.ServiceModel.Activation;
    using System.ServiceModel.Dispatcher; 
    using System.ServiceModel.Description;
    using System.IO;
    using System.IdentityModel.Claims;
    using System.IdentityModel.Policy; 
    using System.ServiceModel.Security;
 
    using System.Threading; 
    using System.ServiceModel.Diagnostics;
    using System.Xml; 
    using System.Diagnostics;
    using System.Runtime.CompilerServices;
    using System.Net;
 
    delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
    delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader); 
    interface ISingletonChannelListener 
    {
        TimeSpan ReceiveTimeout { get; } 
        void ReceiveRequest(RequestContext requestContext, ItemDequeuedCallback callback, bool canDispatchOnThisThread);
    }

    class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader 
    {
        ServerSingletonDecoder decoder; 
        ServerSingletonPreambleCallback callback; 
        WaitCallback onAsyncReadComplete;
        IConnectionOrientedTransportFactorySettings transportSettings; 
        TransportSettingsCallback transportSettingsCallback;
        SecurityMessageProperty security;
        Uri via;
        IConnection rawConnection; 
        byte[] connectionBuffer;
        bool isReadPending; 
        int offset; 
        int size;
        TimeoutHelper receiveTimeoutHelper; 
        OnViaDelegate viaDelegate;

        public ServerSingletonPreambleConnectionReader(IConnection connection, ItemDequeuedCallback connectionDequeuedCallback,
            long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback, 
            ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback)
            : base(connection, closedCallback) 
        { 
            this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
            this.offset = offset; 
            this.size = size;
            this.callback = callback;
            this.transportSettingsCallback = transportSettingsCallback;
            this.rawConnection = connection; 
            this.ConnectionDequeuedCallback = connectionDequeuedCallback;
 
        } 

        public int BufferOffset 
        {
            get { return this.offset; }
        }
 
        public int BufferSize
        { 
            get { return this.size; } 
        }
 
        public ServerSingletonDecoder Decoder
        {
            get { return this.decoder; }
        } 

        public IConnection RawConnection 
        { 
            get { return this.rawConnection; }
        } 

        public Uri Via
        {
            get { return this.via; } 
        }
 
        public IConnectionOrientedTransportFactorySettings TransportSettings 
        {
            get { return this.transportSettings; } 
        }

        public SecurityMessageProperty Security
        { 
            get { return this.security; }
        } 
 
        TimeSpan GetRemainingTimeout()
        { 
            return this.receiveTimeoutHelper.RemainingTime();
        }

        void ReadAndDispatch() 
        {
            bool success = false; 
            try 
            {
                while ((size > 0 || !isReadPending) && !IsClosed) 
                {
                    if (size == 0)
                    {
                        isReadPending = true; 
                        if (onAsyncReadComplete == null)
                        { 
                            onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete); 
                        }
 
                        if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(),
                            onAsyncReadComplete, null) == AsyncReadResult.Queued)
                        {
                            break; 
                        }
                        HandleReadComplete(); 
                    } 

                    int bytesRead = decoder.Decode(connectionBuffer, offset, size); 
                    if (bytesRead > 0)
                    {
                        offset += bytesRead;
                        size -= bytesRead; 
                    }
 
                    if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart) 
                    {
                        this.via = decoder.Via; 
                        if (!Connection.Validate(via))
                        {
                            // This goes through the failure path (Abort) even though it doesn't throw.
                            return; 
                        }
                        if (viaDelegate != null) 
                        { 
                            try
                            { 
                                viaDelegate(via);
                            }
                            catch (ServiceActivationException e)
                            { 
                                if (DiagnosticUtility.ShouldTraceInformation)
                                { 
                                    DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); 
                                }
                                // return fault and close connection 
                                SendFault(FramingEncodingString.ServiceActivationFailedFault);
                                break;
                            }
                        } 

 
                        this.transportSettings = transportSettingsCallback(via); 

                        if (transportSettings == null) 
                        {
                            EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
                            if (DiagnosticUtility.ShouldTraceInformation)
                            { 
                                DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
                            } 
 
                            SendFault(FramingEncodingString.EndpointNotFoundFault);
                            return; 
                        }

                        // we have enough information to hand off to a channel. Our job is done
                        callback(this); 
                        break;
                    } 
                } 
                success = true;
            } 
            catch (CommunicationException exception)
            {
                if (DiagnosticUtility.ShouldTraceInformation)
                { 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
                } 
            } 
            catch (TimeoutException exception)
            { 
                if (DiagnosticUtility.ShouldTraceInformation)
                {
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
                } 
            }
            catch (Exception e) 
            { 
                if (DiagnosticUtility.IsFatal(e))
                { 
                    throw;
                }
                if (!ExceptionHandler.HandleTransportExceptionHelper(e))
                { 
                    throw;
                } 
 
                // containment -- we abort ourselves for any error, no extra containment needed
            } 
            finally
            {
                if (!success)
                { 
                    Abort();
                } 
            } 
        }
 
        public void SendFault(string faultString)
        {
            SendFault(faultString, ref this.receiveTimeoutHelper);
        } 

        void SendFault(string faultString, ref TimeoutHelper timeoutHelper) 
        { 
            InitialServerConnectionReader.SendFault(Connection, faultString,
                connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize); 
        }

        // finish preamble (upgrade and ACK)
        public IConnection CompletePreamble(TimeSpan timeout) 
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
            if (!transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(decoder.ContentType)) 
            {
                SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper); 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
                    SR.ContentTypeMismatch, decoder.ContentType, transportSettings.MessageEncoderFactory.Encoder.ContentType)));
            }
 
            StreamUpgradeAcceptor upgradeAcceptor = null;
            StreamUpgradeProvider upgrade = transportSettings.Upgrade; 
 
            if (upgrade != null)
            { 
                upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
            }

            IConnection currentConnection = this.Connection; 
            for (; ; )
            { 
                if (size == 0) 
                {
                    offset = 0; 
                    size = currentConnection.Read(connectionBuffer, 0, connectionBuffer.Length, timeoutHelper.RemainingTime());
                    if (size == 0)
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException()); 
                    }
                } 
 
                for (; ; )
                { 
                    int bytesRead = decoder.Decode(connectionBuffer, offset, size);
                    if (bytesRead > 0)
                    {
                        offset += bytesRead; 
                        size -= bytesRead;
                    } 
 
                    switch (decoder.CurrentState)
                    { 
                        case ServerSingletonDecoder.State.UpgradeRequest:
                            if (upgradeAcceptor == null)
                            {
                                SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper); 
                                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                                    new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, decoder.Upgrade))); 
                            } 

                            if (!upgradeAcceptor.CanUpgrade(decoder.Upgrade)) 
                            {
                                SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
                                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, decoder.Upgrade)));
                            } 

                            // accept upgrade 
                            currentConnection.Write(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length, true, timeoutHelper.RemainingTime()); 

                            IConnection connectionToUpgrade = currentConnection; 
                            if (this.size > 0)
                            {
                                connectionToUpgrade = new PreReadConnection(connectionToUpgrade, this.connectionBuffer, this.offset, this.size);
                            } 

                            try 
                            { 
                                currentConnection = InitialServerConnectionReader.UpgradeConnection(connectionToUpgrade, upgradeAcceptor, transportSettings);
                                connectionBuffer = currentConnection.AsyncReadBuffer; 
                            }
#pragma warning suppress 56500
                            catch (Exception exception)
                            { 
                                if (DiagnosticUtility.IsFatal(exception))
                                    throw; 
 
                                // Audit Authentication Failure
                                WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception); 
                                throw;
                            }
                            break;
 
                        case ServerSingletonDecoder.State.Start:
                            SetupSecurityIfNecessary(upgradeAcceptor); 
 
                            // we've finished the preamble. Ack and return.
                            currentConnection.Write(ServerSessionEncoder.AckResponseBytes, 0, 
                                ServerSessionEncoder.AckResponseBytes.Length, true, timeoutHelper.RemainingTime());
                            return currentConnection;
                    }
 
                    if (size == 0)
                    { 
                        break; 
                    }
                } 
            }
        }

 

        void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor) 
        { 
             StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor;
             if (securityUpgradeAcceptor != null) 
             {
                 this.security = securityUpgradeAcceptor.GetRemoteSecurity();
                 if (this.security == null)
                 { 
                     Exception securityFailedException = new ProtocolException(
                     SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via)); 
                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException); 
                 }
                 // Audit Authentication Success 
                 WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
             }
        }
 
        #region Transport Security Auditing
        void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception) 
        { 
            try
            { 
                WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
            }
#pragma warning suppress 56500 // covered by FxCop
            catch (Exception auditException) 
            {
                if (DiagnosticUtility.IsFatal(auditException)) 
                { 
                    throw;
                } 

                DiagnosticUtility.ExceptionUtility.TraceHandledException(auditException, TraceEventType.Error);
            }
        } 

        void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception) 
        { 
            if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
            { 
                return;
            }

            if (securityUpgradeAcceptor == null) 
            {
                return; 
            } 
            String primaryIdentity = String.Empty;
            SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity(); 
            if (clientSecurity != null)
            {
                primaryIdentity = GetIdentityNameFromContext(clientSecurity);
            } 

            ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior; 
 
            if (auditLevel == AuditLevel.Success)
            { 
                SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
                    auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity);
            }
            else 
            {
                SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation, 
                    auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception); 
            }
        } 

        [MethodImpl(MethodImplOptions.NoInlining)]
        static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
        { 
            return SecurityUtils.GetIdentityNamesFromContext(
                clientSecurity.ServiceSecurityContext.AuthorizationContext); 
        } 
        #endregion
 
        void HandleReadComplete()
        {
            offset = 0;
            size = Connection.EndRead(); 
            isReadPending = false;
            if (size == 0) 
            { 
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
            } 
        }

        void OnAsyncReadComplete(object state)
        { 
            bool success = false;
            try 
            { 
                HandleReadComplete();
                ReadAndDispatch(); 
                success = true;
            }
            catch (CommunicationException exception)
            { 
                if (DiagnosticUtility.ShouldTraceInformation)
                { 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information); 
                }
            } 
            catch (TimeoutException exception)
            {
                if (DiagnosticUtility.ShouldTraceInformation)
                { 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
                } 
            } 
            catch (Exception e)
            { 
                if (DiagnosticUtility.IsFatal(e))
                {
                    throw;
                } 
                if (!ExceptionHandler.HandleTransportExceptionHelper(e))
                { 
                    throw; 
                }
 
                // containment -- we abort ourselves for any error, no extra containment needed
            }
            finally
            { 
                if (!success)
                { 
                    Abort(); 
                }
            } 
        }

        public void StartReading(OnViaDelegate viaDelegate, TimeSpan timeout)
        { 
            this.viaDelegate = viaDelegate;
            this.receiveTimeoutHelper = new TimeoutHelper(timeout); 
            this.connectionBuffer = Connection.AsyncReadBuffer; 
            ReadAndDispatch();
        } 
    }

    class ServerSingletonConnectionReader : SingletonConnectionReader
    { 
        ConnectionDemuxer connectionDemuxer;
        ServerSingletonDecoder decoder; 
        IConnection rawConnection; 
        string contentType;
 
        public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader,
            IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer)
            : base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize,
            preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via) 
        {
            this.decoder = preambleReader.Decoder; 
            this.contentType = this.decoder.ContentType; 
            this.connectionDemuxer = connectionDemuxer;
            this.rawConnection = preambleReader.RawConnection; 
        }

        protected override string ContentType
        { 
            get { return this.contentType; }
        } 
 
        protected override long StreamPosition
        { 
            get { return this.decoder.StreamPosition; }
        }

        protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof) 
        {
            while (size > 0) 
            { 
                int bytesRead = decoder.Decode(buffer, offset, size);
                if (bytesRead > 0) 
                {
                    offset += bytesRead;
                    size -= bytesRead;
                } 

                switch (decoder.CurrentState) 
                { 
                    case ServerSingletonDecoder.State.EnvelopeStart:
                        // we're at the envelope 
                        return true;

                    case ServerSingletonDecoder.State.End:
                        isAtEof = true; 
                        return false;
                } 
            } 

            return false; 
        }

        protected override void OnClose(TimeSpan timeout)
        { 
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            // send back EOF and then recycle the connection 
            this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime()); 
            this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime());
        } 

        protected override void PrepareMessage(Message message)
        {
            base.PrepareMessage(message); 
            IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
 
            // pipes will return null 
            if (remoteEndPoint != null)
            { 
                RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
                message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
            }
        } 
    }
 
    abstract class SingletonConnectionReader 
    {
        IConnection connection; 
        bool doneReceiving;
        bool doneSending;
        bool isAtEof;
        bool isClosed; 
        SecurityMessageProperty security;
        object thisLock = new object(); 
        int offset; 
        int size;
        IConnectionOrientedTransportFactorySettings transportSettings; 
        Uri via;
        Stream inputStream;

        protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security, 
            IConnectionOrientedTransportFactorySettings transportSettings, Uri via)
        { 
            this.connection = connection; 
            this.offset = offset;
            this.size = size; 
            this.security = security;
            this.transportSettings = transportSettings;
            this.via = via;
        } 

        protected IConnection Connection 
        { 
            get
            { 
                return this.connection;
            }
        }
 
        protected object ThisLock
        { 
            get 
            {
                return this.thisLock; 
            }
        }

        protected virtual string ContentType 
        {
            get { return null; } 
        } 

        protected abstract long StreamPosition { get; } 

        public void Abort()
        {
            this.connection.Abort(); 
        }
 
        public void DoneReceiving(bool atEof) 
        {
            DoneReceiving(atEof, this.transportSettings.CloseTimeout); 
        }

        void DoneReceiving(bool atEof, TimeSpan timeout)
        { 
            if (!this.doneReceiving)
            { 
                this.isAtEof = atEof; 
                this.doneReceiving = true;
 
                if (this.doneSending)
                {
                    this.Close(timeout);
                } 
            }
        } 
 
        public void Close(TimeSpan timeout)
        { 
            lock (ThisLock)
            {
                if (this.isClosed)
                { 
                    return;
                } 
 
                this.isClosed = true;
            } 

            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            bool success = false;
            try 
            {
                // first drain our stream if necessary 
                if (this.inputStream != null) 
                {
                    byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize); 
                    while (!this.isAtEof)
                    {
                        this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime());
                        int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length); 
                        if (bytesRead == 0)
                        { 
                            this.isAtEof = true; 
                        }
                    } 
                }
                OnClose(timeoutHelper.RemainingTime());
                success = true;
            } 
            finally
            { 
                if (!success) 
                {
                    this.Abort(); 
                }
            }
        }
 
        protected abstract void OnClose(TimeSpan timeout);
 
        public void DoneSending(TimeSpan timeout) 
        {
            this.doneSending = true; 
            if (this.doneReceiving)
            {
                this.Close(timeout);
            } 
        }
 
        protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof); 

        protected virtual void PrepareMessage(Message message) 
        {
            message.Properties.Via = this.via;
            message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null;
        } 

        public RequestContext ReceiveRequest(TimeSpan timeout) 
        { 
            Message requestMessage = Receive(timeout);
            return new StreamedFramingRequestContext(this, requestMessage); 
        }

        public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            return new ReceiveAsyncResult(this, timeout, callback, state);
        } 
 
        public virtual Message EndReceive(IAsyncResult result)
        { 
            return ReceiveAsyncResult.End(result);
        }

        public Message Receive(TimeSpan timeout) 
        {
            byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize); 
 
            if (size > 0)
            { 
                Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size);
            }

            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
            for (; ; )
            { 
                if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof)) 
                {
                    break; 
                }

                if (this.isAtEof)
                { 
                    DoneReceiving(true, timeoutHelper.RemainingTime());
                    return null; 
                } 

                if (size == 0) 
                {
                    offset = 0;
                    size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
                    if (size == 0) 
                    {
                        DoneReceiving(true, timeoutHelper.RemainingTime()); 
                        return null; 
                    }
                } 
            }

            // we're ready to read a message
            IConnection singletonConnection = this.connection; 
            if (size > 0)
            { 
                byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size); 
                Buffer.BlockCopy(buffer, offset, initialData, 0, size);
                singletonConnection = new PreReadConnection(singletonConnection, initialData); 
            }

            Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings);
            this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize); 
            using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
            { 
                if (DiagnosticUtility.ShouldUseActivity) 
                {
                    ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage); 
                }

                Message message = null;
                try 
                {
                    message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage( 
                        this.inputStream, transportSettings.MaxBufferSize, this.ContentType); 
                }
                catch (XmlException xmlException) 
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
                        new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
                } 

                if (DiagnosticUtility.ShouldUseActivity) 
                { 
                    TraceUtility.TransferFromTransport(message);
                } 

                PrepareMessage(message);

                return message; 
            }
        } 
 
        class ReceiveAsyncResult : AsyncResult
        { 
            static WaitCallback onReceiveScheduled = new WaitCallback(OnReceiveScheduled);

            Message message;
            SingletonConnectionReader parent; 
            TimeSpan timeout;
 
            public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback, 
                object state)
                : base(callback, state) 
            {
                this.parent = parent;
                this.timeout = timeout;
 
                //
                IOThreadScheduler.ScheduleCallback(onReceiveScheduled, this); 
            } 

            public static Message End(IAsyncResult result) 
            {
                ReceiveAsyncResult receiveAsyncResult = AsyncResult.End(result);
                return receiveAsyncResult.message;
            } 

            static void OnReceiveScheduled(object state) 
            { 
                ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state;
 
                Exception completionException = null;
                try
                {
                    thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception exception) 
                {
                    if (DiagnosticUtility.IsFatal(exception)) 
                    {
                        throw;
                    }
                    completionException = exception; 
                }
                thisPtr.Complete(false, completionException); 
            } 
        }
 
        class StreamedFramingRequestContext : RequestContextBase
        {
            IConnection connection;
            SingletonConnectionReader parent; 
            IConnectionOrientedTransportFactorySettings settings;
            TimeoutHelper timeoutHelper; 
 
            public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage)
                : base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout) 
            {
                this.parent = parent;
                this.connection = parent.connection;
                this.settings = parent.transportSettings; 
            }
 
            protected override void OnAbort() 
            {
                this.parent.Abort(); 
            }

            protected override void OnClose(TimeSpan timeout)
            { 
                this.parent.Close(timeout);
            } 
 
            protected override void OnReply(Message message, TimeSpan timeout)
            { 
                timeoutHelper = new TimeoutHelper(timeout);
                StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper);
                parent.DoneSending(timeoutHelper.RemainingTime());
            } 

            protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state) 
            { 
                timeoutHelper = new TimeoutHelper(timeout);
                return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings, 
                    ref timeoutHelper, callback, state);
            }

            protected override void OnEndReply(IAsyncResult result) 
            {
                StreamingConnectionHelper.EndWriteMessage(result); 
                parent.DoneSending(timeoutHelper.RemainingTime()); 
            }
        } 

        // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
        class SingletonInputConnectionStream : ConnectionStream
        { 
            SingletonMessageDecoder decoder;
            SingletonConnectionReader reader; 
            bool atEof; 
            byte[] chunkBuffer; // used for when we have overflow
            int chunkBufferOffset; 
            int chunkBufferSize;
            int chunkBytesRemaining;

            public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection, 
                IDefaultCommunicationTimeouts defaultTimeouts)
                : base(connection, defaultTimeouts) 
            { 
                this.reader = reader;
                this.decoder = new SingletonMessageDecoder(reader.StreamPosition); 
                this.chunkBytesRemaining = 0;
                this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize];
            }
 
            void AbortReader()
            { 
                this.reader.Abort(); 
            }
 
            public override void Close()
            {
                this.reader.DoneReceiving(this.atEof);
            } 

            // run chunk data through the decoder 
            void DecodeData(byte[] buffer, int offset, int size) 
            {
                while (size > 0) 
                {
                    int bytesRead = decoder.Decode(buffer, offset, size);
                    offset += bytesRead;
                    size -= bytesRead; 
                    DiagnosticUtility.DebugAssert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, "");
                } 
            } 

            // run the current data through the decoder to get valid message bytes 
            void DecodeSize(byte[] buffer, ref int offset, ref int size)
            {
                while (size > 0)
                { 
                    int bytesRead = decoder.Decode(buffer, offset, size);
 
                    if (bytesRead > 0) 
                    {
                        offset += bytesRead; 
                        size -= bytesRead;
                    }

                    switch (decoder.CurrentState) 
                    {
                        case SingletonMessageDecoder.State.ChunkStart: 
                            this.chunkBytesRemaining = decoder.ChunkSize; 

                            // if we have overflow and we're not decoding out of our buffer, copy over 
                            if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer))
                            {
                                DiagnosticUtility.DebugAssert(size <= this.chunkBuffer.Length, "");
                                Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size); 
                                this.chunkBufferOffset = 0;
                                this.chunkBufferSize = size; 
                            } 
                            return;
 
                        case SingletonMessageDecoder.State.End:
                            ProcessEof();
                            return;
                    } 
                }
            } 
 
            int ReadCore(byte[] buffer, int offset, int count)
            { 
                int bytesRead = -1;
                try
                {
                    bytesRead = base.Read(buffer, offset, count); 
                    if (bytesRead == 0)
                    { 
                        ProcessEof(); 
                    }
                } 
                finally
                {
                    if (bytesRead == -1)  // there was an exception
                    { 
                        AbortReader();
                    } 
                } 

                return bytesRead; 
            }

            public override int Read(byte[] buffer, int offset, int count)
            { 
                int result = 0;
                while (true) 
                { 
                    if (count == 0)
                    { 
                        return result;
                    }

                    if (this.atEof) 
                    {
                        return result; 
                    } 

                    // first deal with any residual carryover 
                    if (this.chunkBufferSize > 0)
                    {
                        int bytesToCopy = Math.Min(chunkBytesRemaining,
                            Math.Min(this.chunkBufferSize, count)); 

                        Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy); 
                        // keep decoder up to date 
                        DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy);
 
                        this.chunkBufferOffset += bytesToCopy;
                        this.chunkBufferSize -= bytesToCopy;
                        this.chunkBytesRemaining -= bytesToCopy;
                        if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0) 
                        {
                            DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize); 
                        } 

                        result += bytesToCopy; 
                        offset += bytesToCopy;
                        count -= bytesToCopy;
                    }
                    else if (chunkBytesRemaining > 0) 
                    {
                        // We're in the middle of a chunk. Try and include the next chunk size as well 
                        int bytesRead = ReadCore(buffer, offset, Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize)); 

                        // keep decoder up to date 
                        DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining));

                        if (bytesRead > chunkBytesRemaining)
                        { 
                            result += this.chunkBytesRemaining;
                            int overflowCount = bytesRead - chunkBytesRemaining; 
                            int overflowOffset = offset + chunkBytesRemaining; 
                            this.chunkBytesRemaining = 0;
                            // read at least part of the next chunk, and put any overflow in this.chunkBuffer 
                            DecodeSize(buffer, ref overflowOffset, ref overflowCount);
                        }
                        else
                        { 
                            result += bytesRead;
                            this.chunkBytesRemaining -= bytesRead; 
                        } 

                        return result; 
                    }
                    else
                    {
                        // Final case: we have a new chunk. Read the size, and loop around again 
                        if (count < IntEncoder.MaxEncodedSize)
                        { 
                            // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer 
                            this.chunkBufferOffset = 0;
                            this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length); 
                            DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
                        }
                        else
                        { 
                            int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize);
                            int sizeOffset = offset; 
                            DecodeSize(buffer, ref sizeOffset, ref bytesRead); 
                        }
                    } 
                }
            }

            void ProcessEof() 
            {
                if (!this.atEof) 
                { 
                    this.atEof = true;
                    if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0 
                        || decoder.CurrentState != SingletonMessageDecoder.State.End)
                    {
                        throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
                    } 

                    this.reader.DoneReceiving(true); 
                } 
            }
 
            public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
            {
                return new ReadAsyncResult(this, buffer, offset, count, callback, state);
            } 

            public override int EndRead(IAsyncResult result) 
            { 
                return ReadAsyncResult.End(result);
            } 

            public class ReadAsyncResult : AsyncResult
            {
                SingletonInputConnectionStream parent; 
                int result;
 
                public ReadAsyncResult(SingletonInputConnectionStream parent, 
                    byte[] buffer, int offset, int count, AsyncCallback callback, object state)
                    : base(callback, state) 
                {
                    this.parent = parent;

                    // 
                    this.result = this.parent.Read(buffer, offset, count);
                    base.Complete(true); 
                } 

                public static int End(IAsyncResult result) 
                {
                    ReadAsyncResult thisPtr = AsyncResult.End(result);
                    return thisPtr.result;
                } 
            }
        } 
    } 

    static class StreamingConnectionHelper 
    {
        public static void WriteMessage(Message message, IConnection connection, bool isRequest,
            IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper)
        { 
            byte[] endBytes = null;
            if (message != null) 
            { 
                MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder;
                byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes; 

                bool writeStreamed;
                if (isRequest)
                { 
                    endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
                    writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode); 
                } 
                else
                { 
                    endBytes = SingletonEncoder.EnvelopeEndBytes;
                    writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
                }
 
                if (writeStreamed)
                { 
                    connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime()); 
                    Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
                    Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper); 
                    messageEncoder.WriteMessage(message, writeTimeoutStream);
                }
                else
                { 
                    ArraySegment messageData = messageEncoder.WriteMessage(message,
                        int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize); 
                    messageData = SingletonEncoder.EncodeMessageFrame(messageData); 
                    Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                        envelopeStartBytes.Length); 
                    connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                        messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager);
                }
            } 
            else if (isRequest) // context handles response end bytes
            { 
                endBytes = SingletonEncoder.EndBytes; 
            }
 
            if (endBytes != null)
            {
                connection.Write(endBytes, 0, endBytes.Length,
                    true, timeoutHelper.RemainingTime()); 
            }
        } 
 
        public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest,
            IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper, 
            AsyncCallback callback, object state)
        {
            return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state);
        } 

        public static void EndWriteMessage(IAsyncResult result) 
        { 
            WriteMessageAsyncResult.End(result);
        } 

        // overrides ConnectionStream to add a Framing int at the beginning of each record
        class StreamingOutputConnectionStream : ConnectionStream
        { 
            byte[] encodedSize;
 
            public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts) 
                : base(connection, timeouts)
            { 
                this.encodedSize = new byte[IntEncoder.MaxEncodedSize];
            }
            void WriteChunkSize(int size)
            { 
                if (size > 0)
                { 
                    int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0); 
                    base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout));
                } 
            }

            public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
            { 
                WriteChunkSize(count);
                return base.BeginWrite(buffer, offset, count, callback, state); 
            } 

            public override void WriteByte(byte value) 
            {
                WriteChunkSize(1);
                base.WriteByte(value);
            } 

            public override void Write(byte[] buffer, int offset, int count) 
            { 
                WriteChunkSize(count);
                base.Write(buffer, offset, count); 
            }
        }

        class WriteMessageAsyncResult : AsyncResult 
        {
            IConnection connection; 
            MessageEncoder encoder; 
            BufferManager bufferManager;
            Message message; 
            static AsyncCallback onWriteBufferedMessage;
            static AsyncCallback onWriteStartBytes;
            static WaitCallback onWriteStartBytesScheduled;
            static AsyncCallback onWriteEndBytes = 
                DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteEndBytes));
            byte[] bufferToFree; 
            IConnectionOrientedTransportFactorySettings settings; 
            TimeoutHelper timeoutHelper;
            byte[] endBytes; 

            public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest,
                IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
                AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                this.connection = connection; 
                this.encoder = settings.MessageEncoderFactory.Encoder;
                this.bufferManager = settings.BufferManager; 
                this.timeoutHelper = timeoutHelper;
                this.message = message;
                this.settings = settings;
 
                bool throwing = true;
                bool completeSelf = false; 
                if (message == null) 
                {
                    if (isRequest) // context takes care of the end bytes on Close/reader.EOF 
                    {
                        this.endBytes = SingletonEncoder.EndBytes;
                    }
                    completeSelf = WriteEndBytes(); 
                }
                else 
                { 
                    try
                    { 
                        byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
                        bool writeStreamed;
                        if (isRequest)
                        { 
                            this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
                            writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode); 
                        } 
                        else
                        { 
                            this.endBytes = SingletonEncoder.EnvelopeEndBytes;
                            writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
                        }
 
                        if (writeStreamed)
                        { 
                            if (onWriteStartBytes == null) 
                            {
                                onWriteStartBytes = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteStartBytes)); 
                            }

                            IAsyncResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true,
                                timeoutHelper.RemainingTime(), onWriteStartBytes, this); 

                            if (writeStartBytesResult.CompletedSynchronously) 
                            { 
                                if (onWriteStartBytesScheduled == null)
                                { 
                                    onWriteStartBytesScheduled = new WaitCallback(OnWriteStartBytesScheduled);
                                }
                                IOThreadScheduler.ScheduleCallback(onWriteStartBytesScheduled, writeStartBytesResult);
                            } 
                        }
                        else 
                        { 
                            ArraySegment messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message,
                                int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize); 
                            messageData = SingletonEncoder.EncodeMessageFrame(messageData);
                            this.bufferToFree = messageData.Array;
                            Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                                envelopeStartBytes.Length); 

                            if (onWriteBufferedMessage == null) 
                            { 
                                onWriteBufferedMessage = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteBufferedMessage));
                            } 
                            IAsyncResult writeBufferedResult =
                                connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
                                messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(),
                                onWriteBufferedMessage, this); 

                            if (writeBufferedResult.CompletedSynchronously) 
                            { 
                                completeSelf = HandleWriteBufferedMessage(writeBufferedResult);
                            } 
                        }
                        throwing = false;
                    }
                    finally 
                    {
                        if (throwing) 
                        { 
                            Cleanup();
                        } 
                    }
                }

                if (completeSelf) 
                {
                    base.Complete(true); 
                } 
            }
 
            public static void End(IAsyncResult result)
            {
                AsyncResult.End(result);
            } 

            void Cleanup() 
            { 
                if (bufferToFree != null)
                { 
                    this.bufferManager.ReturnBuffer(bufferToFree);
                }
            }
 
            bool HandleWriteStartBytes(IAsyncResult result)
            { 
                connection.EndWrite(result); 
                Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
                Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper); 
                this.encoder.WriteMessage(message, writeTimeoutStream);
                return WriteEndBytes();
            }
 
            bool HandleWriteBufferedMessage(IAsyncResult result)
            { 
                this.connection.EndWrite(result); 
                return WriteEndBytes();
            } 

            bool WriteEndBytes()
            {
                if (this.endBytes == null) 
                {
                    Cleanup(); 
                    return true; 
                }
 
                IAsyncResult result = connection.BeginWrite(endBytes, 0,
                    endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this);

                if (!result.CompletedSynchronously) 
                {
                    return false; 
                } 

                return HandleWriteEndBytes(result); 
            }

            bool HandleWriteEndBytes(IAsyncResult result)
            { 
                this.connection.EndWrite(result);
                Cleanup(); 
                return true; 
            }
 
            static void OnWriteStartBytes(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                { 
                    return;
                } 
 
                OnWriteStartBytesCallbackHelper(result);
            } 

            static void OnWriteStartBytesScheduled(object state)
            {
                OnWriteStartBytesCallbackHelper((IAsyncResult) state); 
            }
 
            static void OnWriteStartBytesCallbackHelper(IAsyncResult result) 
            {
                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState; 

                Exception completionException = null;
                bool completeSelf = false;
                bool throwing = true; 
                try
                { 
                    completeSelf = thisPtr.HandleWriteStartBytes(result); 
                    throwing = false;
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    } 
                    completeSelf = true;
                    completionException = e; 
                }
                finally
                {
                    if (throwing) 
                    {
                        thisPtr.Cleanup(); 
                    } 
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                } 
            }
 
            static void OnWriteBufferedMessage(IAsyncResult result) 
            {
                if (result.CompletedSynchronously) 
                {
                    return;
                }
 
                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState;
 
                Exception completionException = null; 
                bool completeSelf = false;
                bool throwing = true; 
                try
                {
                    completeSelf = thisPtr.HandleWriteBufferedMessage(result);
                    throwing = false; 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread 
                catch (Exception e) 
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw;
                    }
                    completeSelf = true; 
                    completionException = e;
                } 
                finally 
                {
                    if (throwing) 
                    {
                        thisPtr.Cleanup();
                    }
                } 
                if (completeSelf)
                { 
                    thisPtr.Complete(false, completionException); 
                }
            } 

            static void OnWriteEndBytes(IAsyncResult result)
            {
                if (result.CompletedSynchronously) 
                {
                    return; 
                } 

                WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState; 

                Exception completionException = null;
                bool completeSelf = false;
                bool success = false; 
                try
                { 
                    completeSelf = thisPtr.HandleWriteEndBytes(result); 
                    success = true;
                } 
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                {
                    if (DiagnosticUtility.IsFatal(e)) 
                    {
                        throw; 
                    } 
                    completeSelf = true;
                    completionException = e; 
                }
                finally
                {
                    if (!success) 
                    {
                        thisPtr.Cleanup(); 
                    } 
                }
 
                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException);
                } 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK