UdpDuplexChannel.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Discovery / System / ServiceModel / Channels / UdpDuplexChannel.cs / 1305376 / UdpDuplexChannel.cs

                            //---------------------------------------------------------------- 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//---------------------------------------------------------------

namespace System.ServiceModel.Channels 
{
    using System; 
    using System.Collections.Generic; 
    using System.Net;
    using System.Net.Sockets; 
    using System.Runtime;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Discovery;
    using System.Threading; 
    using System.Xml;
 
    abstract class UdpDuplexChannel : DuplexChannel, IUdpReceiveHandler 
    {
        bool cleanedUp; 
        int maxPendingMessageCount;
        int maxReceivedMessageSize;
        SynchronizedRandom randomNumberGenerator;
        AsyncWaitHandle retransmissionDoneWaitHandle; 
        UdpRetransmissionSettings retransmitSettings;
        Dictionary retransmitList; 
        Uri via; 

        protected UdpDuplexChannel(ChannelManagerBase channelManager, MessageEncoder encoder, BufferManager bufferManager, 
            UdpSocket[] sendSockets, UdpRetransmissionSettings retransmissionSettings,
            int maxPendingMessageCount, EndpointAddress localAddress, Uri via, bool isMulticast, int maxReceivedMessageSize)
            : base(channelManager, localAddress)
        { 
            Fx.Assert(encoder != null, "encoder shouldn't be null");
            Fx.Assert(bufferManager != null, "buffer manager shouldn't be null"); 
            Fx.Assert(sendSockets != null, "sendSockets can't be null"); 
            Fx.Assert(sendSockets.Length > 0, "sendSockets can't be empty");
            Fx.Assert(retransmissionSettings != null, "retransmissionSettings can't be null"); 
            Fx.Assert(maxPendingMessageCount > 0, "maxPendingMessageCount must be > 0");
            Fx.Assert(maxReceivedMessageSize > 0, "maxReceivedMessageSize must be > 0");
            Fx.Assert(via != null, "via can't be null");
 
            this.maxPendingMessageCount = maxPendingMessageCount;
            this.Encoder = encoder; 
            this.Sockets = sendSockets; 
            this.BufferManager = bufferManager;
            this.retransmitSettings = retransmissionSettings; 
            this.IsMulticast = isMulticast;
            this.DuplicateDetector = null;
            this.ReceiveManager = null;
            this.OwnsBufferManager = false; 
            this.maxReceivedMessageSize = maxReceivedMessageSize;
            this.via = via; 
 
            if (retransmitSettings.Enabled)
            { 
                this.retransmitList = new Dictionary();
                this.randomNumberGenerator = new SynchronizedRandom(AppDomain.CurrentDomain.GetHashCode() | Environment.TickCount);
            }
        } 

        protected abstract bool IgnoreSerializationException { get; } 
 
        public override EndpointAddress RemoteAddress
        { 
            get { return null; }
        }

        public override Uri Via 
        {
            get { return via; } 
        } 

        protected bool OwnsBufferManager { get; set; } 
        protected DuplicateMessageDetector DuplicateDetector { get; set; }
        protected UdpSocketReceiveManager ReceiveManager { get; set; }

        protected BufferManager BufferManager 
        {
            get; 
            private set; 
        }
 
        protected MessageEncoder Encoder
        {
            get;
            private set; 
        }
 
        protected bool IsMulticast 
        {
            get; 
            private set;
        }

        protected UdpSocket[] Sockets 
        {
            get; 
            private set; 
        }
 
        public override T GetProperty()
        {
            if (typeof(T) == typeof(IDuplexChannel))
            { 
                return (T)(object)this;
            } 
 
            T messageEncoderProperty = this.Encoder.GetProperty();
            if (messageEncoderProperty != null) 
            {
                return messageEncoderProperty;
            }
 
            return base.GetProperty();
        } 
 
        internal virtual void HandleReceiveException(Exception ex)
        { 
            base.EnqueueAndDispatch(UdpUtility.WrapAsyncException(ex), null, false);
        }

        int IUdpReceiveHandler.MaxReceivedMessageSize 
        {
            get { return this.maxReceivedMessageSize; } 
        } 

        //returns false if the message was dropped because the max pending message count was hit. 
        bool IUdpReceiveHandler.HandleDataReceived(ArraySegment data, EndPoint remoteEndpoint, int interfaceIndex, Action onMessageDequeuedCallback)
        {
            bool returnBuffer = true;
            string messageHash = null; 
            Message message = null;
            bool continueReceiving = true; 
 
            try
            { 
                IPEndPoint remoteIPEndPoint = (IPEndPoint)remoteEndpoint;

                message = UdpUtility.DecodeMessage(this.DuplicateDetector, this.Encoder, this.BufferManager,
                    data, remoteIPEndPoint, interfaceIndex, this.IgnoreSerializationException, out messageHash); 

                if (message != null) 
                { 
                    continueReceiving = EnqueueMessage(message, onMessageDequeuedCallback);
                    returnBuffer = !continueReceiving; 
                }
            }
            catch (Exception e)
            { 
                if (Fx.IsFatal(e))
                { 
                    returnBuffer = false; 
                    throw;
                } 

                HandleReceiveException(e);
            }
            finally 
            {
                if (returnBuffer) 
                { 
                    if (message != null)
                    { 
                        if (this.DuplicateDetector != null)
                        {
                            Fx.Assert(messageHash != null, "message hash should always be available if duplicate detector is enabled");
                            this.DuplicateDetector.RemoveEntry(messageHash); 
                        }
 
                        message.Close(); // implicitly returns the buffer 
                    }
                    else 
                    {
                        this.BufferManager.ReturnBuffer(data.Array);
                    }
                } 
            }
 
            return continueReceiving; 
        }
 
        void IUdpReceiveHandler.HandleAsyncException(Exception ex)
        {
            HandleReceiveException(ex);
        } 

        //Since ChannelListener and channel lifetimes can be different, we need a 
        //way to transfer the socketReceiveManager and DuplicateMessageDetection 
        //objects to the channel if the listener gets closed.  If this method succeeds, then
        //this also indicates that the bufferManager is no longer owned by the channel listener, 
        //so we have to clean that up also.
        internal bool TransferReceiveManagerOwnership(UdpSocketReceiveManager socketReceiveManager,
            DuplicateMessageDetector duplicateDetector)
        { 
            bool success = false;
            if (this.State == CommunicationState.Opened) 
            { 
                lock (ThisLock)
                { 
                    if (this.State == CommunicationState.Opened)
                    {
                        Fx.Assert(this.ReceiveManager == null, "ReceiveManager is already set to a non-null value");
                        Fx.Assert(this.DuplicateDetector == null, "DuplicateDetector is already set to a non-null value"); 

                        this.ReceiveManager = socketReceiveManager; 
                        this.OwnsBufferManager = true; 
                        this.ReceiveManager.SetReceiveHandler(this);
                        this.DuplicateDetector = duplicateDetector; 
                        success = true;
                    }
                }
            } 
            return success;
        } 
 
        //returns false if the max pending message count was hit.
        internal bool EnqueueMessage(Message message, Action onMessageDequeuedCallback) 
        {
            bool success = false;
            lock (this.ThisLock)
            { 
                if (base.InternalPendingItems < this.maxPendingMessageCount)
                { 
                    message.Properties.Via = this.Via; 
                    base.EnqueueAndDispatch(message, onMessageDequeuedCallback, false);
                    success = true; 
                }
                else
                {
                    if (TD.MaxPendingMessageCountReachedIsEnabled()) 
                    {
                        TD.MaxPendingMessageCountReached(message.Headers.MessageId != null ? message.Headers.MessageId.ToString() : "NULL", 
                            this.maxPendingMessageCount); 
                    }
                } 
            }

            return success;
 
        }
 
        internal ArraySegment EncodeMessage(Message message) 
        {
            return this.Encoder.WriteMessage(message, Int32.MaxValue, this.BufferManager); 
        }

        //will either return a valid socket or will set exceptionToBeThrown
        protected UdpSocket GetSendSocket(IPAddress address, Uri destination, out Exception exceptionToBeThrown) 
        {
            Fx.Assert(this.IsMulticast == false, "This overload should only be used for unicast."); 
 
            UdpSocket result = null;
            exceptionToBeThrown = null; 
            AddressFamily family = address.AddressFamily;

            lock (ThisLock)
            { 
                if (this.State == CommunicationState.Opened)
                { 
                    for (int i = 0; i < this.Sockets.Length; i++) 
                    {
                        if (family == this.Sockets[i].AddressFamily) 
                        {
                            result = this.Sockets[i];
                            break;
                        } 
                    }
 
                    if (result == null) 
                    {
                        exceptionToBeThrown = new InvalidOperationException(SR.RemoteAddressUnreachableDueToIPVersionMismatch(destination)); 
                    }
                }
                else
                { 
                    exceptionToBeThrown = CreateObjectDisposedException();
                } 
            } 

 

            return result;
        }
 
        //will either return a valid socket or will set exceptionToBeThrown
        protected UdpSocket GetSendSocket(int interfaceIndex, out Exception exceptionToBeThrown) 
        { 
            Fx.Assert(this.IsMulticast == true, "This overload should only be used for multicast.");
 
            UdpSocket result = null;
            exceptionToBeThrown = null;

            lock (ThisLock) 
            {
                if (this.State == CommunicationState.Opened) 
                { 
                    for (int i = 0; i < this.Sockets.Length; i++)
                    { 
                        if (interfaceIndex == this.Sockets[i].InterfaceIndex)
                        {
                            result = this.Sockets[i];
                            break; 
                        }
                    } 
 
                    if (result == null)
                    { 
                        exceptionToBeThrown = new InvalidOperationException(SR.UdpSendFailedInterfaceIndexMatchNotFound(interfaceIndex));
                    }
                }
                else 
                {
                    exceptionToBeThrown = CreateObjectDisposedException(); 
                } 
            }
 
            return result;
        }

        //Must return non-null/non-empty array unless exceptionToBeThrown is has been set 
        protected virtual UdpSocket[] GetSendSockets(Message message, out IPEndPoint remoteEndPoint, out Exception exceptionToBeThrown)
        { 
            Fx.Assert(message != null, "message can't be null"); 

            UdpSocket[] socketList = null; 
            exceptionToBeThrown = null;

            remoteEndPoint = null;
            Uri destination; 
            bool isVia = false;
 
            if (message.Properties.Via != null) 
            {
                destination = message.Properties.Via; 
                isVia = true;
            }
            else if (message.Headers.To != null)
            { 
                destination = message.Headers.To;
            } 
            else 
            {
                throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ToOrViaRequired)); 
            }

            ValidateDestinationUri(destination, isVia);
 
            if (destination.HostNameType == UriHostNameType.IPv4 || destination.HostNameType == UriHostNameType.IPv6)
            { 
                remoteEndPoint = new IPEndPoint(IPAddress.Parse(destination.DnsSafeHost), destination.Port); 

                if (this.IsMulticast) 
                {
                    UdpSocket socket = GetSendSocketUsingInterfaceIndex(message.Properties, out exceptionToBeThrown);

                    if (socket != null) 
                    {
                        if (socket.AddressFamily == remoteEndPoint.AddressFamily) 
                        { 
                            socketList = new UdpSocket[] { socket };
                        } 
                        else
                        {
                            exceptionToBeThrown = new InvalidOperationException(SR.RemoteAddressUnreachableDueToIPVersionMismatch(destination.DnsSafeHost));
                        } 
                    }
                } 
                else 
                {
                    UdpSocket socket = GetSendSocket(remoteEndPoint.Address, destination, out exceptionToBeThrown); 
                    if (socket != null)
                    {
                        socketList = new UdpSocket[] { socket };
                    } 
                }
            } 
            else 
            {
                IPAddress[] remoteAddresses = DnsCache.Resolve(destination.DnsSafeHost).AddressList; 

                if (this.IsMulticast)
                {
                    UdpSocket socket = GetSendSocketUsingInterfaceIndex(message.Properties, out exceptionToBeThrown); 

                    if (socket != null) 
                    { 
                        socketList = new UdpSocket[] { socket };
 
                        for (int i = 0; i < remoteAddresses.Length; i++)
                        {
                            if (remoteAddresses[i].AddressFamily == socket.AddressFamily)
                            { 
                                remoteEndPoint = new IPEndPoint(remoteAddresses[i], destination.Port);
                                break; 
                            } 
                        }
 
                        if (remoteEndPoint == null)
                        {
                            //for multicast, we only listen on either IPv4 or IPv6 (not both).
                            //if we didn't find a matching remote endpoint, then it would indicate that 
                            //the remote host didn't resolve to an address we can use...
                            exceptionToBeThrown = new InvalidOperationException(SR.RemoteAddressUnreachableDueToIPVersionMismatch(destination.DnsSafeHost)); 
                        } 
                    }
                } 
                else
                {
                    bool useIPv4 = true;
                    bool useIPv6 = true; 

                    for (int i = 0; i < remoteAddresses.Length; i++) 
                    { 
                        IPAddress address = remoteAddresses[i];
 
                        if (address.AddressFamily == AddressFamily.InterNetwork && useIPv4)
                        {
                            UdpSocket socket = GetSendSocket(address, destination, out exceptionToBeThrown);
                            if (socket == null) 
                            {
                                if (this.State != CommunicationState.Opened) 
                                { 
                                    //time to exit, the channel is closing down.
                                    break; 
                                }
                                else
                                {
                                    //no matching socket on IPv4, so ignore future IPv4 addresses 
                                    //in the remoteAddresses list
                                    useIPv4 = false; 
                                } 
                            }
                            else 
                            {
                                remoteEndPoint = new IPEndPoint(address, destination.Port);
                                socketList = new UdpSocket[] { socket };
                                break; 
                            }
 
                        } 
                        else if (address.AddressFamily == AddressFamily.InterNetworkV6 && useIPv6)
                        { 
                            UdpSocket socket = GetSendSocket(address, destination, out exceptionToBeThrown);
                            if (socket == null)
                            {
                                if (this.State != CommunicationState.Opened) 
                                {
                                    //time to exit, the channel is closing down. 
                                    break; 
                                }
                                else 
                                {
                                    //no matching socket on IPv6, so ignore future IPv6 addresses
                                    //in the remoteAddresses list
                                    useIPv6 = false; 
                                }
                            } 
                            else 
                            {
                                remoteEndPoint = new IPEndPoint(address, destination.Port); 
                                socketList = new UdpSocket[] { socket };
                                break;
                            }
                        } 
                    }
                } 
            } 

            return socketList; 
        }

        protected ObjectDisposedException CreateObjectDisposedException()
        { 
            return new ObjectDisposedException(null, SR.ObjectDisposed(this.GetType().Name));
        } 
 
        RetransmitIterator CreateRetransmitIterator(bool sendingMulticast)
        { 
            Fx.Assert(this.retransmitSettings.Enabled, "CreateRetransmitCalculator called when no retransmission set to happen");
            int lowerBound = this.retransmitSettings.GetDelayLowerBound();
            int upperBound = this.retransmitSettings.GetDelayUpperBound();
            int currentDelay = this.randomNumberGenerator.Next(lowerBound, upperBound); 

            int maxDelay = this.retransmitSettings.GetMaxDelayPerRetransmission(); 
            int maxRetransmitCount = sendingMulticast ? this.retransmitSettings.MaxMulticastRetransmitCount : this.retransmitSettings.MaxUnicastRetransmitCount; 

            return new RetransmitIterator(currentDelay, maxDelay, maxRetransmitCount); 
        }

        //Closes the channel ungracefully during error conditions.
        protected override void OnAbort() 
        {
            Cleanup(true, TimeSpan.Zero); 
        } 

        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            this.OnOpen(timeout);
            return new CompletedAsyncResult(callback, state);
        } 

        protected override void OnEndOpen(IAsyncResult result) 
        { 
            CompletedAsyncResult.End(result);
        } 

        protected override void OnOpen(TimeSpan timeout)
        {
            for (int i = 0; i < this.Sockets.Length; i++) 
            {
                this.Sockets[i].Open(); 
            } 
        }
 
        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CloseAsyncResult(
                this, 
                new ChainedBeginHandler(base.OnBeginClose),
                new ChainedEndHandler(base.OnEndClose), 
                timeout, 
                callback,
                state); 
        }

        protected override void OnEndClose(IAsyncResult result)
        { 
            CloseAsyncResult.End(result);
        } 
 
        //Closes the channel gracefully during normal conditions.
        protected override void OnClose(TimeSpan timeout) 
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
            Cleanup(false, timeoutHelper.RemainingTime());
            base.OnClose(timeoutHelper.RemainingTime()); 
        }
 
        protected override void OnSend(Message message, TimeSpan timeout) 
        {
            TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 

            IPEndPoint remoteEndPoint;
            UdpSocket[] sendSockets;
            Exception exceptionToBeThrown; 
            sendSockets = GetSendSockets(message, out remoteEndPoint, out exceptionToBeThrown);
 
            if (exceptionToBeThrown != null) 
            {
                throw FxTrace.Exception.AsError(exceptionToBeThrown); 
            }

            if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
            { 
                throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndPoint, timeout)));
            } 
 
            bool returnBuffer = false;
            ArraySegment messageData = default(ArraySegment); 

            bool sendingMulticast = UdpUtility.IsMulticastAddress(remoteEndPoint.Address);
            SynchronousRetransmissionHelper retransmitHelper = null;
            RetransmitIterator retransmitIterator = null; 

            bool shouldRetransmit = ShouldRetransmitMessage(sendingMulticast); 
 
            try
            { 
                if (shouldRetransmit)
                {
                    retransmitIterator = CreateRetransmitIterator(sendingMulticast);
                    retransmitHelper = new SynchronousRetransmissionHelper(sendingMulticast); 
                    RetransmitStarting(message.Headers.MessageId, retransmitHelper);
                } 
 
                messageData = this.EncodeMessage(message);
                returnBuffer = true; 

                TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper);

                if (shouldRetransmit) 
                {
                    while(retransmitIterator.MoveNext()) 
                    { 
                        //wait for currentDelay time, then retransmit
                        if (retransmitIterator.CurrentDelay > 0) 
                        {
                            retransmitHelper.Wait(retransmitIterator.CurrentDelay);
                        }
 
                        if (retransmitHelper.IsCanceled)
                        { 
                            ThrowIfAborted(); 
                            return;
                        } 

                        //since we only invoke the encoder once just before the initial send of the message
                        //we need to handle logging the message in the retransmission case
                        if (MessageLogger.LogMessagesAtTransportLevel) 
                        {
                            LogMessage(ref message, messageData); 
                        } 

                        TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper); 
                    }
                }
            }
            finally 
            {
                if (returnBuffer) 
                { 
                    this.BufferManager.ReturnBuffer(messageData.Array);
                } 

                if (shouldRetransmit)
                {
                    RetransmitStopping(message.Headers.MessageId); 

                    if (retransmitHelper != null) 
                    { 
                        retransmitHelper.Dispose();
                    } 
                }
            }
        }
 
        static void LogMessage(ref Message message, ArraySegment messageData)
        { 
            using (XmlDictionaryReader xmlDictionaryReader = XmlDictionaryReader.CreateTextReader(messageData.Array, messageData.Offset, messageData.Count, null, XmlDictionaryReaderQuotas.Max, null)) 
            {
                MessageLogger.LogMessage(ref message, xmlDictionaryReader, MessageLoggingSource.TransportSend); 
            }
        }

        bool ShouldRetransmitMessage(bool sendingMulticast) 
        {
            if (sendingMulticast) 
            { 
                return this.retransmitSettings.MaxMulticastRetransmitCount > 0;
            } 
            else
            {
                return this.retransmitSettings.MaxUnicastRetransmitCount > 0;
            } 
        }
 
        protected override IAsyncResult  OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) 
        {
            return new SendAsyncResult(this, message, timeout, callback, state); 
        }

        protected override void  OnEndSend(IAsyncResult result)
        { 
            SendAsyncResult.End(result);
        } 
 
        protected override void AddHeadersTo(Message message)
        { 
            Fx.Assert(message != null, "Message can't be null");

            if (message.Version.Addressing != AddressingVersion.None)
            { 
                if (message.Headers.MessageId == null)
                { 
                    message.Headers.MessageId = new UniqueId(); 
                }
            } 
            else
            {
                if (this.retransmitSettings.Enabled == true)
                { 
                    //we should only get here if some channel above us starts producing messages that don't match the encoder's message version.
                    throw FxTrace.Exception.AsError(new ProtocolException(SR.RetransmissionRequiresAddressingOnMessage(message.Version.Addressing.ToString()))); 
                } 
            }
        } 

        //we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once.
        void Cleanup(bool aborting, TimeSpan timeout)
        { 
            bool needToWait = false;
 
            if (this.cleanedUp) 
            {
                return; 
            }

            lock (this.ThisLock)
            { 
                if (this.cleanedUp)
                { 
                    return; 
                }
 
                if (!aborting && this.retransmitList != null && this.retransmitList.Count > 0)
                {
                    needToWait = true;
                    this.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset); 
                }
                else 
                { 
                    // copied this call here in order to avoid releasing then retaking lock
                    CleanupAfterWait(aborting); 
                }
            }

            if (needToWait) 
            {
                if (!this.retransmissionDoneWaitHandle.Wait(timeout)) 
                { 
                    throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeout)));
                } 

                lock (this.ThisLock)
                {
                    this.retransmissionDoneWaitHandle = null; 

                    //another thread could have called Abort while Close() was waiting for retransmission to complete. 
                    if (this.cleanedUp) 
                    {
                        return; 
                    }

                    CleanupAfterWait(aborting);
                } 
            }
            CleanupBufferManager(); 
        } 

        void CleanupBufferManager() 
        {
            if (this.OwnsBufferManager)
            {
                this.BufferManager.Clear(); 
            }
        } 
 
        // must be called from within this.ThisLock
        void CleanupAfterWait(bool aborting) 
        {
            Fx.Assert(!this.cleanedUp, "We should only clean up once");

            if (this.retransmitList != null) 
            {
                foreach (IUdpRetransmitter retransmitter in this.retransmitList.Values) 
                { 
                    retransmitter.CancelRetransmission();
                } 

                if (aborting && this.retransmissionDoneWaitHandle != null)
                {
                    //If another thread has called close and is waiting for retransmission to complete, 
                    //we need to make sure that thread gets unblocked.
                    this.retransmissionDoneWaitHandle.Set(); 
                } 

                this.retransmitList = null; 
            }

            for (int i = 0; i < this.Sockets.Length; i++)
            { 
                this.Sockets[i].Close();
            } 
 
            if (this.DuplicateDetector != null)
            { 
                this.DuplicateDetector.Dispose();
            }

            if (this.ReceiveManager != null) 
            {
                this.ReceiveManager.Close(); 
            } 

            this.cleanedUp = true; 
        }

        UdpSocket GetSendSocketUsingInterfaceIndex(MessageProperties properties, out Exception exceptionToBeThrown)
        { 
            UdpMessageProperty property;
            UdpSocket socket = null; 
            exceptionToBeThrown = null; 

            if (!UdpMessageProperty.TryGet(properties, out property)) 
            {
                if (this.Sockets.Length > 1)
                {
                    //this property is required on all messages sent from the channel listener. 
                    //the client channel does not use this method to get the send sockets or the
                    //remote endpoint, so it is safe to throw... 
 
                    exceptionToBeThrown = new InvalidOperationException(SR.UdpMessagePropertyMissing);
                } 
                else
                {
                    //there is only one socket, so just send it on that one.
                    socket = this.Sockets[0]; 
                }
            } 
            else 
            {
                socket = GetSendSocket(property.InterfaceIndex, out exceptionToBeThrown); 
            }

            return socket;
        } 

        void RetransmitStarting(UniqueId messageId, IUdpRetransmitter retransmitter) 
        { 
            Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStarting called when retransmission is disabled");
 
            lock (this.ThisLock)
            {
                ThrowIfDisposed();
 
                if (this.retransmitList.ContainsKey(messageId))
                { 
                    //someone is sending a message with the same MessageId 
                    //while a retransmission is still in progress for that ID.
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RecycledMessageIdDuringRetransmission(messageId))); 
                }
                else
                {
                    this.retransmitList[messageId] = retransmitter; 
                }
            } 
        } 

        void RetransmitStopping(UniqueId messageId) 
        {
            Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStopping called when retransmission is disabled");

            lock (this.ThisLock) 
            {
                //Cleanup sets retransmitList to null, so check before using... 
                if (this.retransmitList != null) 
                {
                    this.retransmitList.Remove(messageId); 

                    //if we are closing down, then we need to unblock the Cleanup code
                    // this.retransmissionDoneEvent only != null if on cleaning up; abort case means that it == null.
                    if (this.retransmitList.Count == 0 && this.retransmissionDoneWaitHandle != null) 
                    {
                        this.retransmissionDoneWaitHandle.Set(); 
                    } 
                }
            } 
        }

        void TransmitMessage(ArraySegment messageBytes, UdpSocket[] sockets, IPEndPoint remoteEndpoint, TimeoutHelper timeoutHelper)
        { 
            Fx.Assert(messageBytes.Array != null, "message data array can't be null");
            Fx.Assert(sockets != null, "sockets can't be null"); 
            Fx.Assert(sockets.Length > 0, "sockets must contain at least one item"); 
            Fx.Assert(remoteEndpoint != null, "remoteEndPoint can't be null");
 
            for (int i = 0; i < sockets.Length; i++)
            {
                if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
                { 
                    throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndpoint, timeoutHelper.OriginalTimeout)));
                } 
 
                sockets[i].SendTo(messageBytes.Array, messageBytes.Offset, messageBytes.Count, remoteEndpoint);
            } 
        }

        void ValidateDestinationUri(Uri destination, bool isVia)
        { 
            if (!destination.Scheme.Equals(UdpConstants.Scheme, StringComparison.OrdinalIgnoreCase))
            { 
                if (isVia) 
                {
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ViaUriIsNotValid(destination, SR.UriSchemeNotSupported(destination.Scheme)))); 
                }
                else
                {
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ToAddressIsNotValid(destination, SR.UriSchemeNotSupported(destination.Scheme)))); 
                }
            } 
 
            if (destination.Port < 1 || destination.Port > IPEndPoint.MaxPort)
            { 
                if (isVia)
                {
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ViaUriIsNotValid(destination, SR.PortNumberInvalid(1, IPEndPoint.MaxPort))));
                } 
                else
                { 
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ToAddressIsNotValid(destination, SR.PortNumberInvalid(1, IPEndPoint.MaxPort)))); 
                }
            } 
        }

        class RetransmitIterator
        { 
            int maxDelay;
            int retransmitCount; 
            int initialDelay; 

            internal RetransmitIterator(int initialDelay, int maxDelay, int retransmitCount) 
            {
                Fx.Assert(initialDelay >= 0, "initialDelay cannot be negative");
                Fx.Assert(maxDelay >= initialDelay, "maxDelay must be >= initialDelay");
                Fx.Assert(retransmitCount > 0, "retransmitCount must be > 0"); 

                this.CurrentDelay = -1; 
                this.initialDelay = initialDelay; 
                this.maxDelay = maxDelay;
                this.retransmitCount = retransmitCount; 
            }

            public int CurrentDelay
            { 
                get;
                private set; 
            } 

            //should be called before each retransmission to determine if 
            //another one is needed.
            public bool MoveNext()
            {
                if (this.CurrentDelay < 0) 
                {
                    this.CurrentDelay = this.initialDelay; 
                    return true; 
                }
 
                bool shouldContinue = --this.retransmitCount > 0;

                if (shouldContinue && this.CurrentDelay < this.maxDelay)
                { 
                    this.CurrentDelay = Math.Min(this.CurrentDelay * 2, this.maxDelay);
                } 
 
                return shouldContinue;
            } 
        }

        interface IUdpRetransmitter
        { 
            void CancelRetransmission();
            bool IsMulticast { get; } 
        } 

        sealed class SynchronousRetransmissionHelper : IUdpRetransmitter, IDisposable 
        {
            ManualResetEvent cancelEvent;
            object thisLock;
            bool cleanedUp; 

            public SynchronousRetransmissionHelper(bool isMulticast) 
            { 
                this.thisLock = new object();
                this.IsMulticast = isMulticast; 
                this.cancelEvent = new ManualResetEvent(false);
            }

            public bool IsMulticast 
            {
                get; 
                private set; 
            }
 
            public bool IsCanceled
            {
                get;
                private set; 
            }
 
            bool ResetEvent() 
            {
                lock (this.thisLock) 
                {
                    if (!this.IsCanceled && !this.cleanedUp)
                    {
                        this.cancelEvent.Reset(); 
                        return true;
                    } 
                } 
                return false;
            } 

            public void Wait(int millisecondsTimeout)
            {
                if (ResetEvent()) 
                {
                    //Dispose should only be called by the same thread that 
                    //is calling this function, making it so that we don't need a lock here... 
                    this.cancelEvent.WaitOne(millisecondsTimeout);
                } 
            }

            public void CancelRetransmission()
            { 
                lock (this.thisLock)
                { 
                    this.IsCanceled = true; 

                    if (!this.cleanedUp) 
                    {
                        this.cancelEvent.Set();
                    }
                } 
            }
 
            public void Dispose() 
            {
                lock (this.thisLock) 
                {
                    if (!this.cleanedUp)
                    {
                        this.cleanedUp = true; 
                        this.cancelEvent.Dispose();
                    } 
                } 
            }
        } 

        // Control flow for async path
        // We use this mechanism to avoid initializing two async objects as logically cleanup+close is one operation.
        // At any point in the Begin* methods, we may go async. The steps are: 
        // - Cleanup channel
        // - Close channel 
        // 
        // initialize CloseAsyncResult
        //   - BeginCleanupAndClose (always attempt to:) 
        //     - BeginCleanup (if async, set callback to CompleteCleanupAndClose; if [....], then:)
        //       - CompleteCleanup (will return true, so we will:)
        //       - BeginClose (if async, set callback to CompleteClose; if [....], then)
        //         - CompleteClose 
        //     - CompleteCleanupAndClose (always attempt to:)
        //       - CompleteCleanup (will return true, so we will:) 
        //         - BeginClose (if async, set callback to CompleteClose; if [....], then) 
        //           - CompleteClose
        class CloseAsyncResult : AsyncResult 
        {
            UdpDuplexChannel channel;
            TimeoutHelper timeoutHelper;
            ChainedBeginHandler baseBeginClose; 
            ChainedEndHandler baseEndClose;
 
            static Action completeCleanupCallback = new Action(CompleteCleanupAndClose); 
            static AsyncCompletion completeCloseCallback = new AsyncCompletion(CompleteClose);
 
            public CloseAsyncResult(UdpDuplexChannel channel, ChainedBeginHandler baseBeginClose, ChainedEndHandler baseEndClose, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channel = channel; 
                this.baseBeginClose = baseBeginClose;
                this.baseEndClose = baseEndClose; 
                this.timeoutHelper = new TimeoutHelper(timeout); 

                if (this.BeginCleanupAndClose()) 
                {
                    Complete(true);
                }
            } 

            bool BeginCleanupAndClose() 
            { 
                if (this.BeginCleanup())
                { 
                    return this.BeginClose();
                }
                return false;
            } 

            bool CompleteCleanupAndClose(Exception exception) 
            { 
                if (this.CompleteCleanup(exception))
                { 
                    return this.BeginClose();
                }
                return false;
            } 

            static void CompleteCleanupAndClose(object state, TimeoutException exception) 
            { 
                CloseAsyncResult thisPtr = (CloseAsyncResult)state;
                Exception completionException = null; 
                bool completeSelf = true;

                try
                { 
                    completeSelf = thisPtr.CompleteCleanupAndClose(exception);
                } 
                catch (Exception e) 
                {
                    if (Fx.IsFatal(e)) 
                    {
                        throw;
                    }
                    completionException = e; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.Complete(false, completionException); 
                }
            }

            bool BeginClose() 
            {
                // AsyncResult.AsyncCompletionWrapperCallback takes care of catching the exceptions for us. 
                IAsyncResult result = this.baseBeginClose(timeoutHelper.RemainingTime(), this.PrepareAsyncCompletion(completeCloseCallback), this); 
                // SyncContinue calls CompleteClose for us in [....] case.
                return this.SyncContinue(result); 
            }

            static bool CompleteClose(IAsyncResult result)
            { 
                // AsyncResult.AsyncCompletionWrapperCallback takes care of catching exceptions for us.
                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState; 
                //We are completing the base class close operation at this point. 
                thisPtr.baseEndClose(result);
                return true; 
            }

            bool BeginCleanup()
            { 
                bool needToWait = false;
 
                if (this.channel.cleanedUp) 
                {
                    return true; 
                }

                lock (this.channel.ThisLock)
                { 
                    if (this.channel.cleanedUp)
                    { 
                        return true; 
                    }
 
                    // we're never aborting in this case...
                    if (this.channel.retransmitList != null && this.channel.retransmitList.Count > 0)
                    {
                        needToWait = true; 
                        this.channel.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset);
                    } 
                    else 
                    {
                        this.channel.CleanupAfterWait(false); 
                    }
                }

                //we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once. 
                //we don't null out retransmissionDoneEvent in the abort case; should be safe to use here.
                if (needToWait && !this.channel.retransmissionDoneWaitHandle.WaitAsync(completeCleanupCallback, this, this.timeoutHelper.RemainingTime())) 
                { 
                    return false;
                } 
                else
                {
                    // we don't need to wait, or we've been signalled already on the WaitAsync call
                    this.channel.CleanupBufferManager(); 
                    return true;
                } 
            } 

            bool CompleteCleanup(Exception exception) 
            {
                if (exception != null)
                {
                    Fx.Assert(exception.GetType() == typeof(TimeoutException), "Exception on callback should always be TimeoutException"); 
                    throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(this.timeoutHelper.OriginalTimeout)));
                } 
 
                lock (this.channel.ThisLock)
                { 
                    this.channel.retransmissionDoneWaitHandle = null;

                    //another thread could have called Abort while Close() was waiting for retransmission to complete.
                    if (this.channel.cleanedUp) 
                    {
                        return true; 
                    } 

                    // never aborting here 
                    this.channel.CleanupAfterWait(false);
                }

                this.channel.CleanupBufferManager(); 
                return true;
            } 
 
            internal static void End(IAsyncResult result)
            { 
                AsyncResult.End(result);
            }
        }
 
        class SendAsyncResult : AsyncResult, IUdpRetransmitter
        { 
            UdpDuplexChannel channel; 
            ArraySegment messageData;
            TimeoutHelper timeoutHelper; 
            IPEndPoint remoteEndpoint;
            int currentSocket;
            UdpSocket [] sendSockets;
            IOThreadTimer retransmitTimer; 
            RetransmitIterator retransmitIterator;
            Message message; 
            bool retransmissionEnabled; 

            static AsyncCallback onSocketSendComplete = Fx.ThunkCallback(new AsyncCallback(OnSocketSendComplete)); 
            static Action onRetransmitMessage = new Action(OnRetransmitMessage);

            public SendAsyncResult(UdpDuplexChannel channel, Message message, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            {
                this.timeoutHelper = new TimeoutHelper(timeout); 
 
                this.channel = channel;
                this.message = message; 
                bool throwing = true;
                bool completedSynchronously = false;

                try 
                {
                    Initialize(message); 
 
                    completedSynchronously = BeginTransmitMessage();
 
                    if (completedSynchronously && this.retransmissionEnabled)
                    {
                        //initial send completed [....], now we need to start the retransmission process...
                        completedSynchronously = BeginRetransmission(); 
                    }
 
                    throwing = false; 
                }
                finally 
                {
                    if (throwing)
                    {
                        Cleanup(); 
                    }
                } 
 
                if (completedSynchronously)
                { 
                    CompleteAndCleanup(true, null);
                }
            }
 
            public bool IsCanceled
            { 
                get; 
                private set;
            } 

            public bool IsMulticast
            {
                get; 
                private set;
            } 
 
            bool BeginTransmitMessage()
            { 
                this.currentSocket = 0;
                return ContinueTransmitting(null);
            }
 
            bool ContinueTransmitting(IAsyncResult socketAsyncResult)
            { 
                while (this.currentSocket < this.sendSockets.Length) 
                {
                    if (socketAsyncResult == null) 
                    {
                        socketAsyncResult = this.sendSockets[this.currentSocket].BeginSendTo(this.messageData.Array,
                            this.messageData.Offset, this.messageData.Count, remoteEndpoint, onSocketSendComplete, this);
 
                        if (!socketAsyncResult.CompletedSynchronously)
                        { 
                            return false; 
                        }
                    } 

                    this.sendSockets[this.currentSocket].EndSendTo(socketAsyncResult);

                    //check for timeout after calling socket.EndSendTo 
                    //so that we don't leave the socket in a bad state/leak async results
                    ThrowIfTimedOut(); 
 
                    if (this.IsCanceled)
                    { 
                        //don't send on the next socket and return true to cause Complete to be called.
                        return true;
                    }
 
                    this.currentSocket++;
                    socketAsyncResult = null; 
                } 

                return true; 
            }

            static void OnSocketSendComplete(IAsyncResult result)
            { 
                if (result.CompletedSynchronously)
                { 
                    return; 
                }
 
                SendAsyncResult thisPtr = (SendAsyncResult)result.AsyncState;
                bool completeSelf = false;
                Exception completionException = null;
 
                try
                { 
                    completeSelf = thisPtr.ContinueTransmitting(result); 

                    if (completeSelf && thisPtr.retransmissionEnabled) 
                    {
                        completeSelf = thisPtr.BeginRetransmission();
                    }
                } 
                catch (Exception e)
                { 
                    if (Fx.IsFatal(e)) 
                    {
                        throw; 
                    }

                    completionException = e;
                    completeSelf = true; 
                }
 
                if (completeSelf) 
                {
                    thisPtr.CompleteAndCleanup(false, completionException); 
                }
            }

            bool BeginRetransmission() 
            {
                //BeginRetransmission should only be called in the case where transmission of the message 
                //completes synchronously. 
                return ContinueRetransmission(RetransmitState.TransmitCompleted);
            } 

            bool ContinueRetransmission(RetransmitState state)
            {
                ThrowIfTimedOut(); 

                while (true) 
                { 
                    switch (state)
                    { 
                        case RetransmitState.TransmitCompleted:
                            if (!retransmitIterator.MoveNext())
                            {
                                //We are done retransmitting 
                                return true;
                            } 
 
                            if (retransmitIterator.CurrentDelay > 0)
                            { 
                                this.retransmitTimer.Set(retransmitIterator.CurrentDelay);
                                return false;
                            }
 
                            state = RetransmitState.WaitCompleted;
                            break; 
                        case RetransmitState.WaitCompleted: 
                            if (this.IsCanceled)
                            { 
                                this.channel.ThrowIfAborted();
                                return true;
                            }
 
                            //since we only invoke the encoder once just before the initial send of the message
                            //we need to handle logging the message in the retransmission case 
                            if (MessageLogger.LogMessagesAtTransportLevel) 
                            {
                                UdpDuplexChannel.LogMessage(ref message, messageData); 
                            }

                            if (!BeginTransmitMessage()) //!completedSync
                            { 
                                return false;
                            } 
 
                            state = RetransmitState.TransmitCompleted;
                            break; 

                        default:
                            Fx.Assert("Unknown RetransmitState value encountered");
                            return true; 
                    }
                } 
            } 

            static void OnRetransmitMessage(object state) 
            {
                SendAsyncResult thisPtr = (SendAsyncResult)state;
                bool completeSelf = false;
                Exception completionException = null; 

                try 
                { 
                    completeSelf = thisPtr.ContinueRetransmission(RetransmitState.WaitCompleted);
                } 
                catch (Exception e)
                {
                    if (Fx.IsFatal(e))
                    { 
                        throw;
                    } 
 
                    completionException = e;
                    completeSelf = true; 
                }

                if (completeSelf)
                { 
                    thisPtr.CompleteAndCleanup(false, completionException);
                } 
            } 

            void Initialize(Message message) 
            {
                Exception exceptionToThrow;
                this.sendSockets = this.channel.GetSendSockets(message, out this.remoteEndpoint, out exceptionToThrow);
 
                if (exceptionToThrow != null)
                { 
                    throw FxTrace.Exception.AsError(exceptionToThrow); 
                }
 
                this.IsMulticast = UdpUtility.IsMulticastAddress(this.remoteEndpoint.Address);

                if (channel.ShouldRetransmitMessage(this.IsMulticast))
                { 
                    this.retransmissionEnabled = true;
                    this.channel.RetransmitStarting(this.message.Headers.MessageId, this); 
                    this.retransmitTimer = new IOThreadTimer(onRetransmitMessage, this, false); 
                    this.retransmitIterator = this.channel.CreateRetransmitIterator(this.IsMulticast);
                } 

                this.messageData = this.channel.EncodeMessage(message);
            }
 
            //tries to terminate retransmission early, but won't cancel async IO immediately
            public void CancelRetransmission() 
            { 
                this.IsCanceled = true;
            } 


            void ThrowIfTimedOut()
            { 
                if (this.timeoutHelper.RemainingTime() <= TimeSpan.Zero)
                { 
                    throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeoutHelper.OriginalTimeout))); 
                }
            } 

            void Cleanup()
            {
                if (this.retransmissionEnabled) 
                {
                    this.channel.RetransmitStopping(this.message.Headers.MessageId); 
                    this.retransmitTimer.Cancel(); 
                }
 
                if (this.messageData.Array != null)
                {
                    this.channel.BufferManager.ReturnBuffer(this.messageData.Array);
                    this.messageData = default(ArraySegment); 
                }
            } 
 
            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result);
            }

            void CompleteAndCleanup(bool completedSynchronously, Exception completionException) 
            {
                Cleanup(); 
                Complete(completedSynchronously, completionException); 
            }
 
            enum RetransmitState
            {
                WaitCompleted,
                TransmitCompleted, 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
                        

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK