Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / OneWayChannelListener.cs / 1 / OneWayChannelListener.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.ServiceModel; using System.Diagnostics; using System.IO; using System.Runtime.Serialization; using System.ServiceModel.Diagnostics; using System.Text; using System.Threading; using System.ServiceModel.Description; using System.Xml; ////// Wraps an IChannelListener class ReplyOneWayChannelListener : LayeredChannelListenerinto an IChannelListener /// { IChannelListener innerChannelListener; bool packetRoutable; public ReplyOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context) : base(context.Binding, context.BuildInnerChannelListener ()) { this.packetRoutable = bindingElement.PacketRoutable; } protected override void OnOpening() { this.innerChannelListener = (IChannelListener )this.InnerChannelListener; base.OnOpening(); } protected override IInputChannel OnAcceptChannel(TimeSpan timeout) { IReplyChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout); return WrapInnerChannel(innerChannel); } protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state); } protected override IInputChannel OnEndAcceptChannel(IAsyncResult result) { IReplyChannel innerChannel = this.innerChannelListener.EndAcceptChannel(result); return WrapInnerChannel(innerChannel); } protected override bool OnWaitForChannel(TimeSpan timeout) { return this.innerChannelListener.WaitForChannel(timeout); } protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state); } protected override bool OnEndWaitForChannel(IAsyncResult result) { return this.innerChannelListener.EndWaitForChannel(result); } IInputChannel WrapInnerChannel(IReplyChannel innerChannel) { if (innerChannel == null) { return null; } else { return new ReplyOneWayInputChannel(this, innerChannel); } } class ReplyOneWayInputChannel : LayeredChannel , IInputChannel { bool validateHeader; public ReplyOneWayInputChannel(ReplyOneWayChannelListener listener, IReplyChannel innerChannel) : base(listener, innerChannel) { this.validateHeader = listener.packetRoutable; } public EndpointAddress LocalAddress { get { return this.InnerChannel.LocalAddress; } } Message ProcessContext(RequestContext context, TimeSpan timeout) { if (context == null) { return null; } bool replySuccess = false; Message result = null; try { // validate that the request message contains our expected header result = context.RequestMessage; result.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context)); if (this.validateHeader) { PacketRoutableHeader.ValidateMessage(result); } try { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); context.Reply(null, timeoutHelper.RemainingTime()); replySuccess = true; } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } } finally { if (!replySuccess) { context.Abort(); if (result != null) { result.Close(); result = null; } } } return result; } public Message Receive() { return this.Receive(this.DefaultReceiveTimeout); } public Message Receive(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); RequestContext context = InnerChannel.ReceiveRequest(timeoutHelper.RemainingTime()); return ProcessContext(context, timeoutHelper.RemainingTime()); } public IAsyncResult BeginReceive(AsyncCallback callback, object state) { return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); } public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) { return new ReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state); } public Message EndReceive(IAsyncResult result) { return ReceiveAsyncResult.End(result); } public bool TryReceive(TimeSpan timeout, out Message message) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); RequestContext context; if (InnerChannel.TryReceiveRequest(timeoutHelper.RemainingTime(), out context)) { message = ProcessContext(context, timeoutHelper.RemainingTime()); return true; } else { message = null; return false; } } public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { return new TryReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state); } public bool EndTryReceive(IAsyncResult result, out Message message) { return TryReceiveAsyncResult.End(result, out message); } public bool WaitForMessage(TimeSpan timeout) { return InnerChannel.WaitForRequest(timeout); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return InnerChannel.BeginWaitForRequest(timeout, callback, state); } public bool EndWaitForMessage(IAsyncResult result) { return InnerChannel.EndWaitForRequest(result); } class TryReceiveAsyncResult : ReceiveAsyncResultBase { bool tryResult; public TryReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader, AsyncCallback callback, object state) : base(innerChannel, timeout, validateHeader, callback, state) { } public static bool End(IAsyncResult result, out Message message) { TryReceiveAsyncResult thisPtr = AsyncResult.End (result); message = thisPtr.Message; return thisPtr.tryResult; } protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) { return InnerChannel.BeginTryReceiveRequest(timeout, callback, state); } protected override RequestContext OnEndReceiveRequest(IAsyncResult result) { RequestContext context; this.tryResult = InnerChannel.EndTryReceiveRequest(result, out context); return context; } } class ReceiveAsyncResult : ReceiveAsyncResultBase { public ReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader, AsyncCallback callback, object state) : base(innerChannel, timeout, validateHeader, callback, state) { } public static Message End(IAsyncResult result) { ReceiveAsyncResult thisPtr = AsyncResult.End (result); return thisPtr.Message; } protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state) { return InnerChannel.BeginReceiveRequest(timeout, callback, state); } protected override RequestContext OnEndReceiveRequest(IAsyncResult result) { return InnerChannel.EndReceiveRequest(result); } } abstract class ReceiveAsyncResultBase : AsyncResult { IReplyChannel innerChannel; RequestContext context; Message message; TimeoutHelper timeoutHelper; bool validateHeader; static AsyncCallback onReceiveRequest = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceiveRequest)); static AsyncCallback onReply = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReply)); protected ReceiveAsyncResultBase(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader, AsyncCallback callback, object state) : base(callback, state) { this.innerChannel = innerChannel; this.timeoutHelper = new TimeoutHelper(timeout); this.validateHeader = validateHeader; IAsyncResult result = this.OnBeginReceiveRequest(timeoutHelper.RemainingTime(), onReceiveRequest, this); if (!result.CompletedSynchronously) { return; } if (HandleReceiveRequestComplete(result)) { base.Complete(true); } } protected IReplyChannel InnerChannel { get { return this.innerChannel; } } protected Message Message { get { return this.message; } } protected abstract IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state); protected abstract RequestContext OnEndReceiveRequest(IAsyncResult result); bool HandleReplyComplete(IAsyncResult result) { bool abortContext = true; try { context.EndReply(result); abortContext = false; } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } finally { if (abortContext) { context.Abort(); } } return true; } bool HandleReceiveRequestComplete(IAsyncResult result) { this.context = this.OnEndReceiveRequest(result); if (this.context == null) { return true; } bool replySuccess = false; IAsyncResult replyResult = null; try { this.message = context.RequestMessage; this.message.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context)); if (validateHeader) { PacketRoutableHeader.ValidateMessage(this.message); } try { replyResult = context.BeginReply(null, timeoutHelper.RemainingTime(), onReply, this); replySuccess = true; } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } } finally { if (!replySuccess) { this.context.Abort(); if (this.message != null) { this.message.Close(); this.message = null; } } } if (replyResult == null) { return true; } else if (replyResult.CompletedSynchronously) { return HandleReplyComplete(replyResult); } else { return false; } } static void OnReceiveRequest(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState; Exception completionException = null; bool completeSelf; try { completeSelf = thisPtr.HandleReceiveRequestComplete(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnReply(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState; Exception completionException = null; bool completeSelf; try { completeSelf = thisPtr.HandleReplyComplete(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } if (completeSelf) { thisPtr.Complete(false, completionException); } } } } } /// /// Wraps an IChannelListener class DuplexSessionOneWayChannelListener : DelegatingChannelListenerinto an IChannelListener /// { IChannelListener innerChannelListener; DuplexSessionOneWayInputChannelAcceptor inputChannelAcceptor; bool packetRoutable; int maxAcceptedChannels; bool acceptPending; int activeChannels; TimeSpan idleTimeout; static AsyncCallback onAcceptInnerChannel = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnAcceptInnerChannel)); AsyncCallback onOpenInnerChannel; EventHandler onInnerChannelClosed; ItemDequeuedCallback onExceptionDequeued; WaitCallback handleAcceptCallback; bool ownsInnerListener; object acceptLock; public DuplexSessionOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context) : base(true, context.Binding, context.BuildInnerChannelListener ()) { this.acceptLock = new object(); this.inputChannelAcceptor = new DuplexSessionOneWayInputChannelAcceptor(this); this.packetRoutable = bindingElement.PacketRoutable; this.maxAcceptedChannels = bindingElement.MaxAcceptedChannels; this.Acceptor = this.inputChannelAcceptor; this.idleTimeout = bindingElement.ChannelPoolSettings.IdleTimeout; this.onOpenInnerChannel = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnOpenInnerChannel)); this.ownsInnerListener = true; this.onInnerChannelClosed = new EventHandler(OnInnerChannelClosed); } bool IsAcceptNecessary { get { return !acceptPending && (activeChannels < maxAcceptedChannels) && (this.innerChannelListener.State == CommunicationState.Opened); } } protected override void OnOpening() { this.innerChannelListener = (IChannelListener )this.InnerChannelListener; this.inputChannelAcceptor.TransferInnerChannelListener(this.innerChannelListener); // acceptor now owns the lifetime this.ownsInnerListener = false; base.OnOpening(); } protected override void OnOpened() { base.OnOpened(); IOThreadScheduler.ScheduleCallback(new WaitCallback(AcceptLoop), null); } protected override void OnAbort() { base.OnAbort(); if (this.ownsInnerListener && this.innerChannelListener != null) // Open didn't complete { this.innerChannelListener.Abort(); } } void AcceptLoop(object state) { AcceptLoop(null); } // we need to kick off an accept (and possibly process a completion as well) void AcceptLoop(IAsyncResult pendingResult) { IDuplexSessionChannel pendingChannel = null; if (pendingResult != null) { if (!ProcessEndAccept(pendingResult, out pendingChannel)) { return; } pendingResult = null; } lock (acceptLock) { while (IsAcceptNecessary) { Exception exceptionToEnqueue = null; try { IAsyncResult result = null; try { result = this.innerChannelListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptInnerChannel, this); } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } continue; } acceptPending = true; if (!result.CompletedSynchronously) { break; } if (this.handleAcceptCallback == null) { this.handleAcceptCallback = new WaitCallback(HandleAcceptCallback); } if (pendingChannel != null) { // don't starve our completed Accept IOThreadScheduler.ScheduleCallback(handleAcceptCallback, pendingChannel); pendingChannel = null; } IDuplexSessionChannel channel = null; if (ProcessEndAccept(result, out channel)) { if (channel != null) { IOThreadScheduler.ScheduleCallback(handleAcceptCallback, channel); } } else { return; } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } if (exceptionToEnqueue != null) { this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null, false); } } } if (pendingChannel != null) { HandleAcceptComplete(pendingChannel); } } // return true if the loop should continue bool ProcessEndAccept(IAsyncResult result, out IDuplexSessionChannel channel) { channel = null; Exception exceptionToEnqueue = null; bool success = false; try { channel = innerChannelListener.EndAcceptChannel(result); success = true; } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } if (success) { if (channel != null) { channel.Closed += this.onInnerChannelClosed; bool traceMaxInboundChannels = false; lock (acceptLock) { this.acceptPending = false; activeChannels++; if (activeChannels >= maxAcceptedChannels) { traceMaxInboundChannels = true; } } if (DiagnosticUtility.ShouldTraceWarning) { if (traceMaxInboundChannels) { TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.MaxAcceptedChannelsReached, new StringTraceRecord("MaxAcceptedChannels", maxAcceptedChannels.ToString(System.Globalization.CultureInfo.InvariantCulture)), this, null); } } } else { // we're at EOF. close up the Acceptor and break out of our loop this.inputChannelAcceptor.Close(); return false; } } else if (exceptionToEnqueue != null) { // see what the state of the inner listener is. If it's still open, don't block the accept loop bool canDispatchOnThisThread = (innerChannelListener.State != CommunicationState.Opened); if (this.onExceptionDequeued == null) { this.onExceptionDequeued = new ItemDequeuedCallback(OnExceptionDequeued); } this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, this.onExceptionDequeued, canDispatchOnThisThread); } else { lock (acceptLock) { this.acceptPending = false; } } return true; } void OnExceptionDequeued() { lock (acceptLock) { this.acceptPending = false; } AcceptLoop(null); } static void OnAcceptInnerChannel(IAsyncResult result) { if (result.CompletedSynchronously) { return; } DuplexSessionOneWayChannelListener thisPtr = (DuplexSessionOneWayChannelListener)result.AsyncState; thisPtr.AcceptLoop(result); } void HandleAcceptCallback(object state) { this.HandleAcceptComplete((IDuplexSessionChannel)state); } void OnInnerChannelClosed(object sender, EventArgs e) { // Reduce our quota and kick off an accept IDuplexSessionChannel channel = (IDuplexSessionChannel)sender; channel.Closed -= this.onInnerChannelClosed; lock (acceptLock) { activeChannels--; } this.AcceptLoop(null); } void HandleAcceptComplete(IDuplexSessionChannel channel) { Exception exceptionToEnqueue = null; bool success = false; this.inputChannelAcceptor.PrepareChannel(channel); IAsyncResult openResult = null; try { openResult = channel.BeginOpen(this.idleTimeout, onOpenInnerChannel, channel); success = true; } catch (CommunicationException e) // consume CommunicationException { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } finally { if (!success && channel != null) { channel.Abort(); } } if (success) { if (openResult.CompletedSynchronously) { CompleteOpen(channel, openResult); } } else { if (exceptionToEnqueue != null) { this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null); } } } void OnOpenInnerChannel(IAsyncResult result) { if (result.CompletedSynchronously) { return; } IDuplexSessionChannel channel = (IDuplexSessionChannel)result.AsyncState; CompleteOpen(channel, result); } // open channel and start receiving messages void CompleteOpen(IDuplexSessionChannel channel, IAsyncResult result) { Exception exceptionToEnqueue = null; bool success = false; try { channel.EndOpen(result); success = true; } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } finally { if (!success) { channel.Abort(); } } if (success) { this.inputChannelAcceptor.AcceptInnerChannel(this, channel); } else if (exceptionToEnqueue != null) { this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null); } } class DuplexSessionOneWayInputChannelAcceptor : InputChannelAcceptor { ChannelTracker receivers; IChannelListener innerChannelListener; public DuplexSessionOneWayInputChannelAcceptor(DuplexSessionOneWayChannelListener listener) : base(listener) { this.receivers = new ChannelTracker (); } public void TransferInnerChannelListener(IChannelListener innerChannelListener) { DiagnosticUtility.DebugAssert(this.innerChannelListener == null, "innerChannelListener must be null prior to transfer"); bool abortListener = false; lock (ThisLock) { this.innerChannelListener = innerChannelListener; if (this.State == CommunicationState.Closing || this.State == CommunicationState.Closed) { // abort happened before we completed the transfer abortListener = true; } } if (abortListener) { innerChannelListener.Abort(); } } public void AcceptInnerChannel(DuplexSessionOneWayChannelListener listener, IDuplexSessionChannel channel) { ChannelReceiver channelReceiver = new ChannelReceiver(listener, channel); this.receivers.Add(channel, channelReceiver); channelReceiver.StartReceiving(); } public void PrepareChannel(IDuplexSessionChannel channel) { this.receivers.PrepareChannel(channel); } protected override InputChannel OnCreateChannel() { return new DuplexSessionOneWayInputChannel(this.ChannelManager, null); } protected override void OnOpen(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); base.OnOpen(timeoutHelper.RemainingTime()); this.receivers.Open(timeoutHelper.RemainingTime()); this.innerChannelListener.Open(timeoutHelper.RemainingTime()); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.receivers, this.innerChannelListener); } protected override void OnEndOpen(IAsyncResult result) { ChainedOpenAsyncResult.End(result); } protected override void OnAbort() { base.OnAbort(); if (!TransferReceivers()) { this.receivers.Abort(); if (this.innerChannelListener != null) { this.innerChannelListener.Abort(); } } } protected override void OnClose(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); base.OnClose(timeoutHelper.RemainingTime()); if (!TransferReceivers()) { this.receivers.Close(timeoutHelper.RemainingTime()); this.innerChannelListener.Close(timeoutHelper.RemainingTime()); } } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { List objectsToClose = new List (); if (!TransferReceivers()) { objectsToClose.Add(this.receivers); objectsToClose.Add(this.innerChannelListener); } return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose); } protected override void OnEndClose(IAsyncResult result) { ChainedCloseAsyncResult.End(result); } // used to decouple our channel and listener lifetimes bool TransferReceivers() { DuplexSessionOneWayInputChannel singletonChannel = (DuplexSessionOneWayInputChannel)base.GetCurrentChannel(); if (singletonChannel == null) { return false; } else { return singletonChannel.TransferReceivers(this.receivers, this.innerChannelListener); } } class DuplexSessionOneWayInputChannel : InputChannel { ChannelTracker receivers; IChannelListener innerChannelListener; public DuplexSessionOneWayInputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress) : base(channelManager, localAddress) { } public bool TransferReceivers(ChannelTracker receivers, IChannelListener innerChannelListener) { lock (ThisLock) { if (this.State != CommunicationState.Opened) { return false; } this.receivers = receivers; this.innerChannelListener = innerChannelListener; return true; } } protected override void OnAbort() { if (this.receivers != null) { DiagnosticUtility.DebugAssert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null"); this.receivers.Abort(); this.innerChannelListener.Abort(); } base.OnAbort(); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { List objectsToClose = new List (); if (this.receivers != null) { objectsToClose.Add(this.receivers); objectsToClose.Add(this.innerChannelListener); } return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose); } protected override void OnEndClose(IAsyncResult result) { ChainedCloseAsyncResult.End(result); } protected override void OnClose(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); if (this.receivers != null) { DiagnosticUtility.DebugAssert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null"); this.receivers.Close(timeoutHelper.RemainingTime()); this.innerChannelListener.Close(timeoutHelper.RemainingTime()); } base.OnClose(timeoutHelper.RemainingTime()); } } } // given an inner channel, pulls messages off of it and enqueues them into the upper channel class ChannelReceiver { ItemDequeuedCallback onMessageDequeued; static AsyncCallback onReceive = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceive)); DuplexSessionOneWayInputChannelAcceptor acceptor; IDuplexSessionChannel channel; TimeSpan idleTimeout; static WaitCallback startReceivingCallback; WaitCallback onStartReceiveLater; WaitCallback onDispatchItemsLater; bool validateHeader; public ChannelReceiver(DuplexSessionOneWayChannelListener parent, IDuplexSessionChannel channel) { this.channel = channel; this.acceptor = parent.inputChannelAcceptor; this.idleTimeout = parent.idleTimeout; this.validateHeader = parent.packetRoutable; this.onMessageDequeued = new ItemDequeuedCallback(OnMessageDequeued); } void StartReceivingCallback(object state) { ((ChannelReceiver)state).StartReceiving(); } public void StartReceiving() { Exception exceptionToEnqueue = null; while (true) { if (channel.State != CommunicationState.Opened) { channel.Abort(); break; } IAsyncResult result = null; try { result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this); } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; break; } if (result != null) { if (!result.CompletedSynchronously) { break; } bool dispatch; bool continueLoop = OnCompleteReceive(result, out dispatch); if (dispatch) { Dispatch(); } if (!continueLoop) { break; } } } if (exceptionToEnqueue != null) { this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued); } } bool EnqueueMessage(Message message) { if (this.validateHeader) { if (!PacketRoutableHeader.TryValidateMessage(message)) { this.channel.Abort(); message.Close(); return false; } else { this.validateHeader = false; // only validate the first message on a session } } return this.acceptor.EnqueueWithoutDispatch(message, this.onMessageDequeued); } void OnStartReceiveLater(object state) { StartReceiving(); } void OnDispatchItemsLater(object state) { Dispatch(); } void Dispatch() { this.acceptor.DispatchItems(); } // returns true if the Receive Loop should continue (or be started if it's not running) bool OnCompleteReceive(IAsyncResult result, out bool dispatchLater) { Exception exceptionToEnqueue = null; Message message = null; bool startLoop = false; dispatchLater = false; try { if (!this.channel.EndTryReceive(result, out message)) { this.channel.Abort(); // we've hit our IdleTimeout } else if (message == null) { this.channel.Close(); // read EOF, close our half of the session } } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } startLoop = (this.channel.State == CommunicationState.Opened); } catch (TimeoutException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } startLoop = (this.channel.State == CommunicationState.Opened); } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } if (message != null) { dispatchLater = EnqueueMessage(message); } else if (exceptionToEnqueue != null) { dispatchLater = this.acceptor.EnqueueWithoutDispatch(exceptionToEnqueue, this.onMessageDequeued); } return startLoop; } void OnMessageDequeued() { IAsyncResult result = null; Exception exceptionToEnqueue = null; try { result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this); } catch (CommunicationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } } #pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exceptionToEnqueue = e; } if (result != null) { if (result.CompletedSynchronously) { bool dispatchLater; if (OnCompleteReceive(result, out dispatchLater)) { if (onStartReceiveLater == null) { onStartReceiveLater = new WaitCallback(OnStartReceiveLater); } IOThreadScheduler.ScheduleCallback(onStartReceiveLater, null); } if (dispatchLater) { if (onDispatchItemsLater == null) { onDispatchItemsLater = new WaitCallback(OnDispatchItemsLater); } IOThreadScheduler.ScheduleCallback(onDispatchItemsLater, null); } } } else if (exceptionToEnqueue != null) { this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued, false); } else // need to kickoff a new loop { if (this.channel.State == CommunicationState.Opened) { if (startReceivingCallback == null) { startReceivingCallback = new WaitCallback(StartReceivingCallback); } IOThreadScheduler.ScheduleCallback(startReceivingCallback, this); } } } static void OnReceive(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ChannelReceiver thisPtr = (ChannelReceiver)result.AsyncState; bool dispatch; if (thisPtr.OnCompleteReceive(result, out dispatch)) { thisPtr.StartReceiving(); } if (dispatch) { thisPtr.Dispatch(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- PackageProperties.cs
- UInt32Storage.cs
- Message.cs
- CheckoutException.cs
- RecognitionEventArgs.cs
- HttpInputStream.cs
- Normalization.cs
- DataControlFieldCell.cs
- x509store.cs
- UmAlQuraCalendar.cs
- MessageQuerySet.cs
- PenThreadWorker.cs
- ValidatedControlConverter.cs
- IdentityManager.cs
- NotifyCollectionChangedEventArgs.cs
- HwndAppCommandInputProvider.cs
- AuthenticationConfig.cs
- StreamGeometry.cs
- StoreAnnotationsMap.cs
- CacheOutputQuery.cs
- SemanticValue.cs
- ZipIOFileItemStream.cs
- UidManager.cs
- InkCanvasInnerCanvas.cs
- DocumentApplication.cs
- RenderingBiasValidation.cs
- FontFamily.cs
- SessionPageStatePersister.cs
- FormsIdentity.cs
- GridViewSortEventArgs.cs
- XmlHierarchicalEnumerable.cs
- OdbcConnectionHandle.cs
- FlowStep.cs
- ContextTokenTypeConverter.cs
- URI.cs
- MethodExpr.cs
- SplitterPanel.cs
- DebugView.cs
- TreeView.cs
- ModelPropertyImpl.cs
- SoapClientMessage.cs
- WindowInteractionStateTracker.cs
- TrimSurroundingWhitespaceAttribute.cs
- Panel.cs
- TransformProviderWrapper.cs
- Wizard.cs
- InputElement.cs
- CodeSubDirectory.cs
- PersonalizableAttribute.cs
- unsafeIndexingFilterStream.cs
- OleDbPermission.cs
- WindowProviderWrapper.cs
- SpAudioStreamWrapper.cs
- ResourcesChangeInfo.cs
- DefaultPerformanceCounters.cs
- GridViewAutomationPeer.cs
- DrawListViewColumnHeaderEventArgs.cs
- CalendarDataBindingHandler.cs
- PropertyOrder.cs
- ComponentResourceManager.cs
- ResourceDefaultValueAttribute.cs
- SoapAttributeAttribute.cs
- X509SubjectKeyIdentifierClause.cs
- OperationResponse.cs
- MimeWriter.cs
- SyntaxCheck.cs
- ExpressionHelper.cs
- ObjectHelper.cs
- CompiledRegexRunnerFactory.cs
- DefaultMemberAttribute.cs
- ZipIOExtraField.cs
- RemoteDebugger.cs
- ToolStripMenuItem.cs
- EntityDataReader.cs
- HostingEnvironmentException.cs
- HuffmanTree.cs
- DataTable.cs
- ElementMarkupObject.cs
- Variable.cs
- JulianCalendar.cs
- InputProviderSite.cs
- HtmlTableRow.cs
- HttpWebResponse.cs
- EventLogPermissionEntryCollection.cs
- SerializerWriterEventHandlers.cs
- WriteableBitmap.cs
- WebPartEditorApplyVerb.cs
- InputManager.cs
- ValidationErrorCollection.cs
- followingquery.cs
- StyleTypedPropertyAttribute.cs
- WSHttpBindingBaseElement.cs
- ByteRangeDownloader.cs
- TreeViewImageIndexConverter.cs
- FontSourceCollection.cs
- ToolStripContainer.cs
- XamlWriter.cs
- WindowsGrip.cs
- ErrorCodes.cs
- StringUtil.cs