Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Discovery / System / ServiceModel / Channels / UdpChannelListener.cs / 1484997 / UdpChannelListener.cs
//---------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. //--------------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Collections.Generic; using System.Net; using System.Net.NetworkInformation; using System.Net.Sockets; using System.Runtime; using System.ServiceModel.Description; using System.ServiceModel.Discovery; using System.Threading; class UdpChannelListener : ChannelListenerBase, IUdpReceiveHandler { BufferManager bufferManager; ServerUdpDuplexChannel channelInstance; InputQueue channelQueue; DuplicateMessageDetector duplicateDetector; bool isMulticast; List listenSockets; Uri listenUri; MessageEncoderFactory messageEncoderFactory; EventHandler onChannelClosed; IUdpTransportSettings settings; UdpSocketReceiveManager socketReceiveManager; int cleanedUp; internal UdpChannelListener(IUdpTransportSettings settings, BindingContext context) : base(context.Binding) { Fx.Assert(settings != null, "settings can't be null"); Fx.Assert(context != null, "BindingContext parameter can't be null"); this.settings = settings; this.cleanedUp = 0; this.duplicateDetector = null; if (settings.DuplicateMessageHistoryLength > 0) { this.duplicateDetector = new DuplicateMessageDetector(settings.DuplicateMessageHistoryLength); } this.bufferManager = BufferManager.CreateBufferManager(settings.MaxBufferPoolSize, (int)settings.MaxReceivedMessageSize); this.onChannelClosed = new EventHandler(OnChannelClosed); if (this.settings.SocketReceiveBufferSize < this.settings.MaxReceivedMessageSize) { throw FxTrace.Exception.ArgumentOutOfRange("SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize, SR.Property1LessThanOrEqualToProperty2("MaxReceivedMessageSize", this.settings.MaxReceivedMessageSize, "SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize)); } this.messageEncoderFactory = UdpUtility.GetEncoder(context); InitUri(context); //Note: because we are binding the sockets in InitSockets, we can start receiving data immediately. //If there is a delay between the Building of the listener and the call to Open, stale data could build up //inside the Winsock buffer. We have decided that making sure the port is updated correctly in the listen uri //(e.g. in the ListenUriMode.Unique case) before leaving the build step is more important than the //potential for stale data. InitSockets(context.ListenUriMode == ListenUriMode.Unique); Fx.Assert(!this.listenUri.IsDefaultPort, "Listen Uri's port should never be the default port: " + this.listenUri); } public MessageEncoderFactory MessageEncoderFactory { get { return this.messageEncoderFactory; } } public override Uri Uri { get { return this.listenUri; } } protected override TimeSpan DefaultReceiveTimeout { get { return UdpConstants.Defaults.ReceiveTimeout; } } protected override TimeSpan DefaultSendTimeout { get { return UdpConstants.Defaults.SendTimeout; } } int IUdpReceiveHandler.MaxReceivedMessageSize { get { return (int)this.settings.MaxReceivedMessageSize; } } string Scheme { get { return UdpConstants.Scheme; } } public override T GetProperty () { T messageEncoderProperty = this.MessageEncoderFactory.Encoder.GetProperty (); if (messageEncoderProperty != null) { return messageEncoderProperty; } if (typeof(T) == typeof(MessageVersion)) { return (T)(object)this.MessageEncoderFactory.Encoder.MessageVersion; } return base.GetProperty (); } void IUdpReceiveHandler.HandleAsyncException(Exception ex) { HandleReceiveException(ex); } //returns false if the message was dropped because the max pending message count was hit. bool IUdpReceiveHandler.HandleDataReceived(ArraySegment data, EndPoint remoteEndpoint, int interfaceIndex, Action onMessageDequeuedCallback) { BufferManager bufferManager = this.bufferManager; bool returnBuffer = true; string messageHash = null; Message message = null; bool continueReceiving = true; try { IPEndPoint remoteIPEndPoint = (IPEndPoint)remoteEndpoint; if (bufferManager != null) { message = UdpUtility.DecodeMessage(this.duplicateDetector, this.messageEncoderFactory.Encoder, bufferManager, data, remoteIPEndPoint, interfaceIndex, true, out messageHash); if (message != null) { continueReceiving = Dispatch(message, onMessageDequeuedCallback); returnBuffer = !continueReceiving; } } else { Fx.Assert(this.State != CommunicationState.Opened, "buffer manager should only be null when closing down and the channel instance has taken control of the receive manager."); IUdpReceiveHandler receiveHandler = (IUdpReceiveHandler)this.channelInstance; if (receiveHandler != null) { returnBuffer = false; //let the channel instance take care of the buffer continueReceiving = receiveHandler.HandleDataReceived(data, remoteEndpoint, interfaceIndex, onMessageDequeuedCallback); } else { //both channel and listener are shutting down, so drop the message and stop the receive loop continueReceiving = false; } } } catch (Exception e) { if (Fx.IsFatal(e)) { returnBuffer = false; throw; } HandleReceiveException(e); } finally { if (returnBuffer) { if (message != null) { if (this.duplicateDetector != null) { Fx.Assert(messageHash != null, "message hash should always be available if duplicate detector is enabled"); this.duplicateDetector.RemoveEntry(messageHash); } message.Close(); // implicitly returns the buffer } else { bufferManager.ReturnBuffer(data.Array); } } } return continueReceiving; } protected override Type GetCommunicationObjectType() { return this.GetType(); } protected override void OnAbort() { Cleanup(); } protected override IDuplexChannel OnAcceptChannel(TimeSpan timeout) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.Dequeue(timeout); } protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.BeginDequeue(timeout, callback, state); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { this.OnClose(timeout); return new CompletedAsyncResult(callback, state); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { this.OnOpen(timeout); return new CompletedAsyncResult(callback, state); } protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.BeginWaitForItem(timeout, callback, state); } protected override void OnClosing() { if (this.channelInstance != null) { lock (ThisLock) { if (this.channelInstance != null) { if (this.channelInstance.TransferReceiveManagerOwnership(this.socketReceiveManager, this.duplicateDetector)) { //don't clean these objects up, they now belong to the channel instance this.socketReceiveManager = null; this.duplicateDetector = null; this.bufferManager = null; } } this.channelInstance = null; } } base.OnClosing(); } protected override void OnClose(TimeSpan timeout) { Cleanup(); } protected override IDuplexChannel OnEndAcceptChannel(IAsyncResult result) { ServerUdpDuplexChannel channel; if (this.channelQueue.EndDequeue(result, out channel)) { return channel; } else { throw FxTrace.Exception.AsError(new TimeoutException()); } } protected override void OnEndClose(IAsyncResult result) { CompletedAsyncResult.End(result); } protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } protected override bool OnEndWaitForChannel(IAsyncResult result) { return this.channelQueue.EndWaitForItem(result); } protected override void OnOpen(TimeSpan timeout) { } protected override void OnOpened() { this.channelQueue = new InputQueue (); Fx.Assert(this.socketReceiveManager == null, "receive manager shouldn't be initialized yet"); this.socketReceiveManager = new UdpSocketReceiveManager(this.listenSockets.ToArray(), UdpConstants.PendingReceiveCountPerProcessor * Environment.ProcessorCount, this.bufferManager, this); //do the state change to CommunicationState.Opened before starting the receive loop. //this avoids a ---- between transitioning state and processing messages that are //already in the socket receive buffer. base.OnOpened(); this.socketReceiveManager.Open(); } protected override bool OnWaitForChannel(TimeSpan timeout) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.WaitForItem(timeout); } void Cleanup() { if (Interlocked.Increment(ref this.cleanedUp) == 1) { lock (this.ThisLock) { if (this.socketReceiveManager != null) { this.socketReceiveManager.Close(); this.socketReceiveManager = null; } if (this.listenSockets != null) { //don't close the sockets...we don't open them, socket manager does. //closing them would cause ref counts to get out of [....]. this.listenSockets.Clear(); this.listenSockets = null; } } if (this.bufferManager != null) { this.bufferManager.Clear(); } if (this.channelQueue != null) { this.channelQueue.Close(); } if (this.duplicateDetector != null) { this.duplicateDetector.Dispose(); } } } //must be called under a lock bool CreateOrRetrieveChannel(out ServerUdpDuplexChannel channel) { bool channelCreated = false; channel = this.channelInstance; if (channel == null) { channelCreated = true; UdpSocket[] sendSockets = this.listenSockets.ToArray(); channel = new ServerUdpDuplexChannel(this, sendSockets, new EndpointAddress(this.listenUri), this.listenUri, this.isMulticast); this.channelInstance = channel; channel.Closed += this.onChannelClosed; } return channelCreated; } bool Dispatch(Message message, Action onMessageDequeuedCallback) { ServerUdpDuplexChannel channel; bool channelCreated; lock (this.ThisLock) { if (this.State != CommunicationState.Opened) { Fx.Assert(this.State > CommunicationState.Opened, "DispatchMessage called when object is not fully opened. This would indicate that the receive loop started before transitioning to CommunicationState.Opened, which should not happen."); //Shutting down - the message will get closed by the caller (IUdpReceiveHandler.OnMessageReceivedCallback) return false; } channelCreated = CreateOrRetrieveChannel(out channel); } if (channelCreated) { this.channelQueue.EnqueueAndDispatch(channel, null, false); } return channel.EnqueueMessage(message, onMessageDequeuedCallback); } //Tries to enqueue this async exception onto the channel instance if possible, //puts it onto the local exception queue otherwise. void HandleReceiveException(Exception ex) { UdpDuplexChannel channel = this.channelInstance; if (channel != null) { channel.HandleReceiveException(ex); } else { if (ServerUdpDuplexChannel.CanIgnoreException(ex)) { FxTrace.Exception.AsWarning(ex); } else { this.channelQueue.EnqueueAndDispatch(UdpUtility.WrapAsyncException(ex), null, false); } } } void InitExplicitUri(Uri listenUriBaseAddress, string relativeAddress) { if (listenUriBaseAddress.IsDefaultPort || listenUriBaseAddress.Port == 0) { throw FxTrace.Exception.ArgumentOutOfRange("context.ListenUriBaseAddress", listenUriBaseAddress, SR.ExplicitListenUriModeRequiresPort); } this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress); } void InitSockets(bool updateListenPort) { bool ipV4; bool ipV6; UdpUtility.CheckSocketSupport(out ipV4, out ipV6); Fx.Assert(this.listenSockets == null, "listen sockets should only be initialized once"); this.listenSockets = new List (); int port = (this.listenUri.IsDefaultPort ? 0 : this.listenUri.Port); if (this.listenUri.HostNameType == UriHostNameType.IPv6 || this.listenUri.HostNameType == UriHostNameType.IPv4) { IPAddress address = IPAddress.Parse(this.listenUri.DnsSafeHost); if (UdpUtility.IsMulticastAddress(address)) { if (this.settings.EnableMulticast == false) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.UdpMulticastNotEnabled(this.listenUri))); } this.isMulticast = true; NetworkInterface[] adapters = UdpUtility.GetMulticastInterfaces(settings.MulticastInterfaceId); //if listening on a specific adapter, don't disable multicast loopback on that adapter. bool allowMulticastLoopback = !string.IsNullOrEmpty(this.settings.MulticastInterfaceId); for (int i = 0; i < adapters.Length; i++) { if (adapters[i].OperationalStatus == OperationalStatus.Up) { IPInterfaceProperties properties = adapters[i].GetIPProperties(); bool isLoopbackAdapter = adapters[i].NetworkInterfaceType == NetworkInterfaceType.Loopback; if (isLoopbackAdapter) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, UdpUtility.GetLoopbackInterfaceIndex(adapters[i], address.AddressFamily == AddressFamily.InterNetwork), allowMulticastLoopback, isLoopbackAdapter)); } else if (this.listenUri.HostNameType == UriHostNameType.IPv6) { if (adapters[i].Supports(NetworkInterfaceComponent.IPv6)) { IPv6InterfaceProperties v6Properties = properties.GetIPv6Properties(); if (v6Properties != null) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, v6Properties.Index, allowMulticastLoopback, isLoopbackAdapter)); } } } else { if (adapters[i].Supports(NetworkInterfaceComponent.IPv4)) { IPv4InterfaceProperties v4Properties = properties.GetIPv4Properties(); if (v4Properties != null) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, v4Properties.Index, allowMulticastLoopback, isLoopbackAdapter)); } } } } } if (listenSockets.Count == 0) { throw FxTrace.Exception.AsError(new ArgumentException(SR.UdpFailedToFindMulticastAdapter(this.listenUri))); } } else { //unicast - only sends on the default adapter... this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } else { IPAddress v4Address = IPAddress.Any; IPAddress v6Address = IPAddress.IPv6Any; if (ipV4 && ipV6) { if (port == 0) { //port 0 is only allowed when ListenUriMode == ListenUriMode.Unique UdpSocket ipv4Socket, ipv6Socket; port = UdpUtility.CreateListenSocketsOnUniquePort(v4Address, v6Address, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, out ipv4Socket, out ipv6Socket); this.listenSockets.Add(ipv4Socket); this.listenSockets.Add(ipv6Socket); } else { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } else if (ipV4) { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } else if (ipV6) { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } if (updateListenPort && port != this.listenUri.Port) { UriBuilder uriBuilder = new UriBuilder(this.listenUri); uriBuilder.Port = port; this.listenUri = uriBuilder.Uri; } } void InitUniqueUri(Uri listenUriBaseAddress, string relativeAddress) { Fx.Assert(listenUriBaseAddress != null, "listenUriBaseAddress parameter should have been verified before now"); listenUriBaseAddress = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress); this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, Guid.NewGuid().ToString()); } void InitUri(BindingContext context) { Uri listenUriBase = context.ListenUriBaseAddress; if (context.ListenUriMode == ListenUriMode.Unique && listenUriBase == null) { UriBuilder uriBuilder = new UriBuilder(this.Scheme, DnsCache.MachineName); uriBuilder.Path = Guid.NewGuid().ToString(); listenUriBase = uriBuilder.Uri; context.ListenUriBaseAddress = listenUriBase; } else { if (listenUriBase == null) { throw FxTrace.Exception.ArgumentNull("context.ListenUriBaseAddress"); } if (!listenUriBase.IsAbsoluteUri) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.RelativeUriNotAllowed(listenUriBase)); } if (context.ListenUriMode == ListenUriMode.Unique && !listenUriBase.IsDefaultPort) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.DefaultPortRequiredForListenUriModeUnique(listenUriBase)); } if (listenUriBase.Scheme.Equals(this.Scheme, StringComparison.OrdinalIgnoreCase) == false) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UriSchemeNotSupported(listenUriBase.Scheme)); } if (!UdpUtility.IsSupportedHostNameType(listenUriBase.HostNameType)) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UnsupportedUriHostNameType(listenUriBase.Host, listenUriBase.HostNameType)); } } switch (context.ListenUriMode) { case ListenUriMode.Explicit: InitExplicitUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); break; case ListenUriMode.Unique: InitUniqueUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); break; default: Fx.AssertAndThrow("Unhandled ListenUriMode encountered: " + context.ListenUriMode); break; } } void OnChannelClosed(object sender, EventArgs args) { ServerUdpDuplexChannel closingChannel = (ServerUdpDuplexChannel)sender; closingChannel.Closed -= this.onChannelClosed; lock (ThisLock) { //set to null within a lock because other code //assumes that the instance will not suddenly become null //if it already holds the lock. this.channelInstance = null; } } sealed class ServerUdpDuplexChannel : UdpDuplexChannel { //the listener's buffer manager is used, but the channel won't clear it unless //UdpChannelListener.OnClosing successfully transfers ownership to the channel instance. public ServerUdpDuplexChannel(UdpChannelListener listener, UdpSocket[] sendSockets, EndpointAddress localAddress, Uri via, bool isMulticast) : base(listener, listener.MessageEncoderFactory.Encoder, listener.bufferManager, sendSockets, listener.settings.RetransmissionSettings, listener.settings.MaxPendingMessageCount, localAddress, via, isMulticast, (int)listener.settings.MaxReceivedMessageSize) { } protected override bool IgnoreSerializationException { get { return true; } } //there are some errors on the server side that we should just ignore because the server will not need //to change its behavior if it sees the exception. These errors are not ignored on the client //because it may need to adjust settings (e.g. TTL, send smaller messages, double check server address for correctness) internal static bool CanIgnoreException(Exception ex) { SocketError error; if (UdpUtility.TryGetSocketError(ex, out error)) { switch (error) { case SocketError.ConnectionReset: //"ICMP Destination Unreachable" error - client closed the socket case SocketError.NetworkReset: //ICMP: Time Exceeded (TTL expired) case SocketError.MessageSize: //client sent a message larger than the max message size allowed. return true; } } return false; } internal override void HandleReceiveException(Exception ex) { if (CanIgnoreException(ex)) { FxTrace.Exception.AsWarning(ex); } else { //base implementation will wrap the exception and enqueue it. base.HandleReceiveException(ex); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- StringFormat.cs
- DbFunctionCommandTree.cs
- JsonObjectDataContract.cs
- ParameterCollectionEditorForm.cs
- HMACSHA384.cs
- GridViewRowPresenter.cs
- XslAst.cs
- ExpressionConverter.cs
- ActiveXMessageFormatter.cs
- BasicKeyConstraint.cs
- TripleDESCryptoServiceProvider.cs
- IndependentAnimationStorage.cs
- MsmqHostedTransportManager.cs
- MemberInfoSerializationHolder.cs
- GenericWebPart.cs
- TextServicesCompartmentEventSink.cs
- OleDbWrapper.cs
- NetPeerTcpBinding.cs
- __ConsoleStream.cs
- ArgIterator.cs
- SpecialFolderEnumConverter.cs
- BlockCollection.cs
- BaseParser.cs
- XmlQueryOutput.cs
- Table.cs
- Activity.cs
- DesignerHelpers.cs
- CodeIndexerExpression.cs
- HttpContext.cs
- ToolStripPanel.cs
- GC.cs
- DictionaryBase.cs
- _ConnectStream.cs
- sortedlist.cs
- CounterSample.cs
- SystemSounds.cs
- BinaryOperationBinder.cs
- SynchronizedKeyedCollection.cs
- XmlSchemaException.cs
- CodeNamespace.cs
- TableLayoutPanelResizeGlyph.cs
- MinimizableAttributeTypeConverter.cs
- XsltException.cs
- Point.cs
- CompressEmulationStream.cs
- DateTimeFormat.cs
- SqlTypesSchemaImporter.cs
- ItemTypeToolStripMenuItem.cs
- ExceptionUtil.cs
- Rule.cs
- NamespaceList.cs
- _NestedSingleAsyncResult.cs
- ShaderRenderModeValidation.cs
- ClientTarget.cs
- Pair.cs
- ApplicationBuildProvider.cs
- DataServiceQueryProvider.cs
- ComPersistableTypeElementCollection.cs
- LinkedDataMemberFieldEditor.cs
- GridViewColumnCollection.cs
- CapabilitiesState.cs
- UserControlParser.cs
- CodeThrowExceptionStatement.cs
- HttpCapabilitiesSectionHandler.cs
- Vertex.cs
- PropertyMetadata.cs
- RowUpdatingEventArgs.cs
- PropertyStore.cs
- HttpResponseInternalWrapper.cs
- EntityDataSourceDesignerHelper.cs
- CellIdBoolean.cs
- StaticResourceExtension.cs
- RoleExceptions.cs
- LocalBuilder.cs
- TraceSection.cs
- StrongNamePublicKeyBlob.cs
- FixedBufferAttribute.cs
- ToolZoneDesigner.cs
- RevocationPoint.cs
- SimpleWebHandlerParser.cs
- MimeFormReflector.cs
- XmlNamespaceDeclarationsAttribute.cs
- Trace.cs
- ApplicationDirectoryMembershipCondition.cs
- TimeSpanMinutesOrInfiniteConverter.cs
- ZipArchive.cs
- QueryOutputWriter.cs
- SessionPageStateSection.cs
- Repeater.cs
- NetSectionGroup.cs
- DetailsViewUpdatedEventArgs.cs
- VariantWrapper.cs
- OutputScope.cs
- ToolStripItemBehavior.cs
- Configuration.cs
- UIElement.cs
- AlternationConverter.cs
- GridViewDeletedEventArgs.cs
- CryptoApi.cs
- DrawingGroup.cs