Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / ReliableChannelListener.cs / 1 / ReliableChannelListener.cs
//---------------------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. //--------------------------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Dispatcher; using System.Collections.ObjectModel; using System.Diagnostics; using System.Globalization; using System.ServiceModel.Diagnostics; using System.ServiceModel.Security; using System.Threading; using System.Xml; // This class tracks the lifetime of the InnerChannelListener (ICL). The ICL must be kept open // as long as some communication object uses it. ReliableChannelListener (RCL) and all the // channels it produces use it. The RCL + the channel count forms a ref count. If the ref // count is 1, the object that wishes to close (since it is the last object to release the // reference) must also close the ICL. If the ref count is 0 any object may abort the ICL. This // means the last closing object may not release its reference until after the ICL's close. abstract class ReliableChannelListenerBase: DelegatingChannelListener , IReliableFactorySettings where TChannel: class, IChannel { TimeSpan acknowledgementInterval; bool closed = false; FaultHelper faultHelper; bool flowControlEnabled; TimeSpan inactivityTimeout; IMessageFilterTable localAddresses; int maxPendingChannels; int maxRetryCount; int maxTransferWindowSize; MessageVersion messageVersion; bool ordered; ReliableMessagingVersion reliableMessagingVersion; protected ReliableChannelListenerBase(ReliableSessionBindingElement settings, Binding binding) : base(true, binding) { this.acknowledgementInterval = settings.AcknowledgementInterval; this.flowControlEnabled = settings.FlowControlEnabled; this.inactivityTimeout = settings.InactivityTimeout; this.maxPendingChannels = settings.MaxPendingChannels; this.maxRetryCount = settings.MaxRetryCount; this.maxTransferWindowSize = settings.MaxTransferWindowSize; this.messageVersion = binding.MessageVersion; this.ordered = settings.Ordered; this.reliableMessagingVersion = settings.ReliableMessagingVersion; } public TimeSpan AcknowledgementInterval { get { return this.acknowledgementInterval; } } protected FaultHelper FaultHelper { get { return this.faultHelper; } set { this.faultHelper = value; } } public bool FlowControlEnabled { get { return this.flowControlEnabled; } } public TimeSpan InactivityTimeout { get { return this.inactivityTimeout; } } // Must call under lock. protected bool IsAccepting { get { return this.State == CommunicationState.Opened; } } public IMessageFilterTable LocalAddresses { get { return this.localAddresses; } set { this.localAddresses = value; } } public int MaxPendingChannels { get { return this.maxPendingChannels; } } public int MaxRetryCount { get { return this.maxRetryCount; } } public int MaxTransferWindowSize { get { return this.maxTransferWindowSize; } } public MessageVersion MessageVersion { get { return this.messageVersion; } } public bool Ordered { get { return this.ordered; } } public ReliableMessagingVersion ReliableMessagingVersion { get { return this.reliableMessagingVersion; } } public TimeSpan SendTimeout { get { return this.InternalSendTimeout; } } protected abstract bool Duplex { get; } // Must call under lock. protected abstract bool HasChannels(); // Must call under lock. Must call after the ReliableChannelListener has been opened. protected abstract bool IsLastChannel(UniqueId inputId); protected override void OnAbort() { bool abortInnerChannelListener; lock (this.ThisLock) { this.closed = true; abortInnerChannelListener = !this.HasChannels(); } if (abortInnerChannelListener) { this.AbortInnerListener(); } base.OnAbort(); } protected virtual void AbortInnerListener() { this.faultHelper.Abort(); this.InnerChannelListener.Abort(); } protected virtual void CloseInnerListener(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); this.faultHelper.Close(timeoutHelper.RemainingTime()); this.InnerChannelListener.Close(timeoutHelper.RemainingTime()); } protected virtual IAsyncResult BeginCloseInnerListener(TimeSpan timeout, AsyncCallback callback, object state) { OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[] { new OperationWithTimeoutBeginCallback(this.faultHelper.BeginClose), new OperationWithTimeoutBeginCallback(this.InnerChannelListener.BeginClose) }; OperationEndCallback[] endOperations = new OperationEndCallback[] { new OperationEndCallback(this.faultHelper.EndClose), new OperationEndCallback(this.InnerChannelListener.EndClose) }; return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginOperations, endOperations, callback, state); } protected virtual void EndCloseInnerListener(IAsyncResult result) { OperationWithTimeoutComposer.EndComposeAsyncOperations(result); } protected override void OnClose(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); if (this.ShouldCloseOnChannelListenerClose()) { this.CloseInnerListener(timeoutHelper.RemainingTime()); this.closed = true; } base.OnClose(timeoutHelper.RemainingTime()); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return new CloseAsyncResult(this, base.OnBeginClose, base.OnEndClose, timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { CloseAsyncResult.End(result); } protected override void OnOpen(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); base.OnOpen(timeoutHelper.RemainingTime()); this.InnerChannelListener.Open(timeoutHelper.RemainingTime()); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, new OperationWithTimeoutBeginCallback[] { new OperationWithTimeoutBeginCallback(base.OnBeginOpen), new OperationWithTimeoutBeginCallback(this.InnerChannelListener.BeginOpen) }, new OperationEndCallback[] { new OperationEndCallback(base.OnEndOpen), new OperationEndCallback(this.InnerChannelListener.EndOpen)}, callback, state); } protected override void OnEndOpen(IAsyncResult result) { OperationWithTimeoutComposer.EndComposeAsyncOperations(result); } public void OnReliableChannelAbort(UniqueId inputId, UniqueId outputId) { lock (this.ThisLock) { this.RemoveChannel(inputId, outputId); if (!this.closed || this.HasChannels()) { return; } } this.AbortInnerListener(); } public void OnReliableChannelClose(UniqueId inputId, UniqueId outputId, TimeSpan timeout) { if (this.ShouldCloseOnReliableChannelClose(inputId, outputId)) { this.CloseInnerListener(timeout); lock (this.ThisLock) { this.RemoveChannel(inputId, outputId); } } } public IAsyncResult OnReliableChannelBeginClose(UniqueId inputId, UniqueId outputId, TimeSpan timeout, AsyncCallback callback, object state) { return new OnReliableChannelCloseAsyncResult(this, inputId, outputId, timeout, callback, state); } public void OnReliableChannelEndClose(IAsyncResult result) { OnReliableChannelCloseAsyncResult.End(result); } // Must call under lock. protected abstract void RemoveChannel(UniqueId inputId, UniqueId outputId); bool ShouldCloseOnChannelListenerClose() { lock (this.ThisLock) { if (!this.HasChannels()) { return true; } else { this.closed = true; return false; } } } bool ShouldCloseOnReliableChannelClose(UniqueId inputId, UniqueId outputId) { lock (this.ThisLock) { if (this.closed && this.IsLastChannel(inputId)) { return true; } else { this.RemoveChannel(inputId, outputId); return false; } } } class CloseAsyncResult : AsyncResult { OperationWithTimeoutBeginCallback baseBeginClose; OperationEndCallback baseEndClose; ReliableChannelListenerBase parent; TimeoutHelper timeoutHelper; static AsyncCallback onBaseChannelListenerCloseComplete = DiagnosticUtility.ThunkAsyncCallback(OnBaseChannelListenerCloseCompleteStatic); static AsyncCallback onInnerChannelListenerCloseComplete = DiagnosticUtility.ThunkAsyncCallback(OnInnerChannelListenerCloseCompleteStatic); public CloseAsyncResult(ReliableChannelListenerBase parent, OperationWithTimeoutBeginCallback baseBeginClose, OperationEndCallback baseEndClose, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.parent = parent; this.baseBeginClose = baseBeginClose; this.baseEndClose = baseEndClose; bool complete = false; if (this.parent.ShouldCloseOnChannelListenerClose()) { this.timeoutHelper = new TimeoutHelper(timeout); IAsyncResult result = this.parent.BeginCloseInnerListener( timeoutHelper.RemainingTime(), onInnerChannelListenerCloseComplete, this); if (result.CompletedSynchronously) { complete = this.CompleteInnerChannelListenerClose(result); } } else { complete = this.CloseBaseChannelListener(timeout); } if (complete) { this.Complete(true); } } bool CloseBaseChannelListener(TimeSpan timeout) { IAsyncResult result = this.baseBeginClose(timeout, onBaseChannelListenerCloseComplete, this); if (result.CompletedSynchronously) { this.baseEndClose(result); return true; } else { return false; } } bool CompleteInnerChannelListenerClose(IAsyncResult result) { this.parent.EndCloseInnerListener(result); this.parent.closed = true; this.parent.faultHelper.Abort(); return this.CloseBaseChannelListener(this.timeoutHelper.RemainingTime()); } public static void End(IAsyncResult result) { AsyncResult.End (result); } void OnBaseChannelListenerCloseComplete(IAsyncResult result) { Exception completeException = null; try { this.baseEndClose(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeException = e; } this.Complete(false, completeException); } static void OnBaseChannelListenerCloseCompleteStatic(IAsyncResult result) { if (!result.CompletedSynchronously) { CloseAsyncResult closeResult = (CloseAsyncResult)result.AsyncState; closeResult.OnBaseChannelListenerCloseComplete(result); } } void OnInnerChannelListenerCloseComplete(IAsyncResult result) { bool complete; Exception completeException = null; try { complete = this.CompleteInnerChannelListenerClose(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } complete = true; completeException = e; } if (complete) { this.Complete(false, completeException); } } static void OnInnerChannelListenerCloseCompleteStatic(IAsyncResult result) { if (!result.CompletedSynchronously) { CloseAsyncResult closeResult = (CloseAsyncResult)result.AsyncState; closeResult.OnInnerChannelListenerCloseComplete(result); } } } class OnReliableChannelCloseAsyncResult : AsyncResult { ReliableChannelListenerBase channelListener; UniqueId inputId; UniqueId outputId; static AsyncCallback onInnerChannelListenerCloseComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnInnerChannelListenerCloseCompleteStatic)); public OnReliableChannelCloseAsyncResult( ReliableChannelListenerBase channelListener, UniqueId inputId, UniqueId outputId, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { if (!channelListener.ShouldCloseOnReliableChannelClose(inputId, outputId)) { this.Complete(true); return; } this.channelListener = channelListener; this.inputId = inputId; this.outputId = outputId; IAsyncResult result = this.channelListener.BeginCloseInnerListener(timeout, onInnerChannelListenerCloseComplete, this); if (result.CompletedSynchronously) { this.CompleteInnerChannelListenerClose(result); this.Complete(true); } } void CompleteInnerChannelListenerClose(IAsyncResult result) { this.channelListener.EndCloseInnerListener(result); lock (this.channelListener.ThisLock) { this.channelListener.RemoveChannel(this.inputId, this.outputId); } } public static void End(IAsyncResult result) { AsyncResult.End (result); } void OnInnerChannelListenerCloseComplete(IAsyncResult result) { Exception completeException = null; try { this.CompleteInnerChannelListenerClose(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeException = e; } this.Complete(false, completeException); } static void OnInnerChannelListenerCloseCompleteStatic(IAsyncResult result) { if (!result.CompletedSynchronously) { OnReliableChannelCloseAsyncResult closeResult = (OnReliableChannelCloseAsyncResult)result.AsyncState; closeResult.OnInnerChannelListenerCloseComplete(result); } } } } abstract class ReliableChannelListener : ReliableChannelListenerBase where TChannel : class, IChannel where TReliableChannel : class, IChannel where TInnerChannel : class, IChannel { Dictionary channelsByInput; Dictionary channelsByOutput; InputQueueChannelAcceptor inputQueueChannelAcceptor; static AsyncCallback onAcceptCompleted = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnAcceptCompletedStatic)); IChannelListener typedListener; protected ReliableChannelListener(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context.Binding) { this.typedListener = context.BuildInnerChannelListener (); this.inputQueueChannelAcceptor = new InputQueueChannelAcceptor (this); this.Acceptor = this.inputQueueChannelAcceptor; } internal override IChannelListener InnerChannelListener { get { return this.typedListener; } set { // until the public setter is removed, throw throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException()); } } IServerReliableChannelBinder CreateBinder(TInnerChannel channel, EndpointAddress localAddress, EndpointAddress remoteAddress) { return ServerReliableChannelBinder .CreateBinder(channel, localAddress, remoteAddress, TolerateFaultsMode.IfNotSecuritySession, this.DefaultCloseTimeout, this.DefaultSendTimeout); } protected abstract TReliableChannel CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder); protected void Dispatch() { this.inputQueueChannelAcceptor.Dispatch(); } // override to hook up events, etc pre-Open protected virtual void OnInnerChannelAccepted(TInnerChannel channel) { } protected bool EnqueueWithoutDispatch(TChannel channel) { return this.inputQueueChannelAcceptor.EnqueueWithoutDispatch(channel, null); } protected TReliableChannel GetChannel(WsrmMessageInfo info, out UniqueId id) { id = WsrmUtilities.GetInputId(info); lock (this.ThisLock) { TReliableChannel channel = null; if ((id == null) || !this.channelsByInput.TryGetValue(id, out channel)) { if (this.Duplex) { UniqueId outputId = WsrmUtilities.GetOutputId(this.ReliableMessagingVersion, info); if (outputId != null) { id = outputId; this.channelsByOutput.TryGetValue(id, out channel); } } } return channel; } } void HandleAcceptComplete(TInnerChannel channel) { if (channel == null) { return; } try { OnInnerChannelAccepted(channel); channel.Open(); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; channel.Abort(); return; } this.ProcessChannel(channel); } protected bool HandleException(Exception e, ICommunicationObject o) { if (e is CommunicationException || e is TimeoutException) { return (o.State == CommunicationState.Opened); } return false; } // Must call under lock. protected override bool HasChannels() { return (this.channelsByInput == null) ? false : (this.channelsByInput.Count > 0); } bool IsExpectedException(Exception e) { if (e is ProtocolException) { return false; } else { return e is CommunicationException; } } // Must call under lock. Must call after the ReliableChannelListener has been opened. protected override bool IsLastChannel(UniqueId inputId) { return (this.channelsByInput.Count == 1) ? channelsByInput.ContainsKey(inputId) : false; } void OnAcceptCompleted(IAsyncResult result) { TInnerChannel channel = null; Exception expectedException = null; Exception unexpectedException = null; try { channel = this.typedListener.EndAcceptChannel(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (this.IsExpectedException(e)) { expectedException = e; } else { unexpectedException = e; } } if (channel != null) { this.HandleAcceptComplete(channel); this.StartAccepting(); } else if (unexpectedException != null) { this.Fault(unexpectedException); } else if ((expectedException != null) && (this.typedListener.State == CommunicationState.Opened)) { this.StartAccepting(); } else if (this.typedListener.State == CommunicationState.Faulted) { this.Fault(expectedException); } } static void OnAcceptCompletedStatic(IAsyncResult result) { if (!result.CompletedSynchronously) { ReliableChannelListener listener = (ReliableChannelListener )result.AsyncState; try { listener.OnAcceptCompleted(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; listener.Fault(e); } } } protected override void OnFaulted() { this.typedListener.Abort(); this.inputQueueChannelAcceptor.FaultQueue(); base.OnFaulted(); } protected override void OnOpened() { base.OnOpened(); this.channelsByInput = new Dictionary (); if (this.Duplex) this.channelsByOutput = new Dictionary (); if (Thread.CurrentThread.IsThreadPoolThread) { try { StartAccepting(); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } else { IOThreadScheduler.ScheduleCallback(new WaitCallback(StartAccepting), this); } } protected TReliableChannel ProcessCreateSequence(WsrmMessageInfo info, TInnerChannel channel, out bool dispatch, out bool newChannel) { dispatch = false; newChannel = false; CreateSequenceInfo createSequenceInfo = info.CreateSequenceInfo; EndpointAddress acksTo; if (!WsrmUtilities.ValidateCreateSequence (info, this, channel, out acksTo)) return null; lock (this.ThisLock) { UniqueId id; TReliableChannel reliableChannel = null; if ((createSequenceInfo.OfferIdentifier != null) && this.Duplex && this.channelsByOutput.TryGetValue(createSequenceInfo.OfferIdentifier, out reliableChannel)) { return reliableChannel; } if (!this.IsAccepting) { info.FaultReply = WsrmUtilities.CreateEndpointNotFoundFault(this.MessageVersion, SR.GetString(SR.RMEndpointNotFoundReason, this.Uri)); return null; } if (this.inputQueueChannelAcceptor.PendingCount >= this.MaxPendingChannels) { info.FaultReply = WsrmUtilities.CreateCSRefusedServerTooBusyFault(this.MessageVersion, this.ReliableMessagingVersion, SR.GetString(SR.ServerTooBusy, this.Uri)); return null; } id = WsrmUtilities.NextSequenceId(); reliableChannel = this.CreateChannel(id, createSequenceInfo, this.CreateBinder(channel, acksTo, createSequenceInfo.ReplyTo)); this.channelsByInput.Add(id, reliableChannel); if (this.Duplex) this.channelsByOutput.Add(createSequenceInfo.OfferIdentifier, reliableChannel); dispatch = this.EnqueueWithoutDispatch((TChannel)(object)reliableChannel); newChannel = true; return reliableChannel; } } protected abstract void ProcessChannel(TInnerChannel channel); // Must call under lock. protected override void RemoveChannel(UniqueId inputId, UniqueId outputId) { this.channelsByInput.Remove(inputId); if (this.Duplex) this.channelsByOutput.Remove(outputId); } void StartAccepting() { Exception expectedException = null; Exception unexpectedException = null; while (this.typedListener.State == CommunicationState.Opened) { TInnerChannel channel = null; expectedException = null; unexpectedException = null; try { IAsyncResult result = this.typedListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptCompleted, this); if (!result.CompletedSynchronously) return; channel = this.typedListener.EndAcceptChannel(result); if (channel == null) break; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (this.IsExpectedException(e)) { expectedException = e; continue; } else { unexpectedException = e; break; } } this.HandleAcceptComplete(channel); } if (unexpectedException != null) { this.Fault(unexpectedException); } else if (this.typedListener.State == CommunicationState.Faulted) { this.Fault(expectedException); } } static void StartAccepting(object state) { ReliableChannelListener channelListener = (ReliableChannelListener )state; try { channelListener.StartAccepting(); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; channelListener.Fault(e); } } } abstract class ReliableListenerOverDatagram : ReliableChannelListener where TChannel : class, IChannel where TReliableChannel : class, IChannel where TInnerChannel : class, IChannel where TItem : class, IDisposable { WaitCallback asyncHandleReceiveComplete; AsyncCallback onTryReceiveComplete; ChannelTracker channelTracker; protected ReliableListenerOverDatagram(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.asyncHandleReceiveComplete = new WaitCallback(this.AsyncHandleReceiveComplete); this.onTryReceiveComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(this.OnTryReceiveComplete)); this.channelTracker = new ChannelTracker (); } void AsyncHandleReceiveComplete(object state) { try { IAsyncResult result = (IAsyncResult)state; TInnerChannel channel = (TInnerChannel)result.AsyncState; TItem item = null; try { this.EndTryReceiveItem(channel, result, out item); if (item == null) return; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); return; } } if (item != null && this.HandleReceiveComplete(item, channel)) StartReceiving(channel, true); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } bool BeginProcessItem(TItem item, WsrmMessageInfo info, TInnerChannel channel, out TReliableChannel reliableChannel, out bool newChannel, out bool dispatch) { dispatch = false; reliableChannel = null; newChannel = false; Message faultReply; if (info.FaultReply != null) { faultReply = info.FaultReply; } else if (info.CreateSequenceInfo == null) { UniqueId id; reliableChannel = this.GetChannel(info, out id); if (reliableChannel != null) return true; if (id == null) { this.DisposeItem(item); return true; } faultReply = new UnknownSequenceFault(id).CreateMessage(this.MessageVersion, this.ReliableMessagingVersion); } else { reliableChannel = this.ProcessCreateSequence(info, channel, out dispatch, out newChannel); if (reliableChannel != null) return true; faultReply = info.FaultReply; } try { this.SendReply(faultReply, channel, item); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); return false; } } finally { faultReply.Close(); this.DisposeItem(item); } return true; } protected abstract IAsyncResult BeginTryReceiveItem(TInnerChannel channel, AsyncCallback callback, object state); protected abstract void DisposeItem(TItem item); protected abstract void EndTryReceiveItem(TInnerChannel channel, IAsyncResult result, out TItem item); void EndProcessItem(TItem item, WsrmMessageInfo info, TReliableChannel channel, bool dispatch) { this.ProcessSequencedItem(channel, item, info); if (dispatch) this.Dispatch(); } protected abstract Message GetMessage(TItem item); bool HandleReceiveComplete(TItem item, TInnerChannel channel) { Message message = null; // Minimalist fix for MB60747: GetMessage can call RequestContext.RequestMessage which can throw. // If we can handle the exception then keep the receive loop going. try { message = this.GetMessage(item); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, this)) throw; item.Dispose(); return true; } WsrmMessageInfo info = WsrmMessageInfo.Get(this.MessageVersion, this.ReliableMessagingVersion, channel, null, message); if (info.ParsingException != null) { this.DisposeItem(item); return true; } TReliableChannel reliableChannel; bool newChannel; bool dispatch; if (!this.BeginProcessItem(item, info, channel, out reliableChannel, out newChannel, out dispatch)) return false; if (reliableChannel == null) { this.DisposeItem(item); return true; } // On the one hand the contract of HandleReceiveComplete is that it won't stop the receive loop; // it can block, but it will ensure the loop doesn't stall. // On the other hand we don't want to take on the cost of blindly jumping threads. // So, if we know EndProcessItem might block (dispatch || !newChannel) then we // try another receive. If that completes async then we know it is safe for us to block, // if not then we force the receive to complete async and *make* it safe for us to block. if (dispatch || !newChannel) { this.StartReceiving(channel, false); this.EndProcessItem(item, info, reliableChannel, dispatch); return false; } else { this.EndProcessItem(item, info, reliableChannel, dispatch); return true; } } void OnTryReceiveComplete(IAsyncResult result) { if (!result.CompletedSynchronously) { try { TInnerChannel channel = (TInnerChannel)result.AsyncState; TItem item = null; try { this.EndTryReceiveItem(channel, result, out item); if (item == null) return; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); return; } } if (item != null && this.HandleReceiveComplete(item, channel)) StartReceiving(channel, true); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return new ChainedAsyncResult(timeout, callback, state, this.channelTracker.BeginOpen, this.channelTracker.EndOpen, base.OnBeginOpen, base.OnEndOpen); } protected override void OnEndOpen(IAsyncResult result) { ChainedAsyncResult.End(result); } protected override void OnOpen(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); this.channelTracker.Open(timeoutHelper.RemainingTime()); base.OnOpen(timeoutHelper.RemainingTime()); } protected override void OnInnerChannelAccepted(TInnerChannel channel) { base.OnInnerChannelAccepted(channel); this.channelTracker.PrepareChannel(channel); } protected override void ProcessChannel(TInnerChannel channel) { try { this.channelTracker.Add(channel, null); this.StartReceiving(channel, false); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } protected override void AbortInnerListener() { base.AbortInnerListener(); this.channelTracker.Abort(); } protected override void CloseInnerListener(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); base.CloseInnerListener(timeoutHelper.RemainingTime()); this.channelTracker.Close(timeoutHelper.RemainingTime()); } protected override IAsyncResult BeginCloseInnerListener(TimeSpan timeout, AsyncCallback callback, object state) { return new ChainedAsyncResult(timeout, callback, state, base.BeginCloseInnerListener, base.EndCloseInnerListener, channelTracker.BeginClose, channelTracker.EndClose); } protected override void EndCloseInnerListener(IAsyncResult result) { ChainedAsyncResult.End(result); } protected abstract void ProcessSequencedItem(TReliableChannel reliableChannel, TItem item, WsrmMessageInfo info); protected abstract void SendReply(Message reply, TInnerChannel channel, TItem item); void StartReceiving(TInnerChannel channel, bool canBlock) { while (true) { TItem item = null; try { IAsyncResult result = this.BeginTryReceiveItem(channel, this.onTryReceiveComplete, channel); if (!result.CompletedSynchronously) break; if (!canBlock) { IOThreadScheduler.ScheduleCallback(this.asyncHandleReceiveComplete, result); break; } this.EndTryReceiveItem(channel, result, out item); if (item == null) break; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); break; } } if (item != null && !this.HandleReceiveComplete(item, channel)) break; } } } abstract class ReliableListenerOverDuplex : ReliableListenerOverDatagram where TChannel : class, IChannel where TReliableChannel : class, IChannel { protected ReliableListenerOverDuplex(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.FaultHelper = new SendFaultHelper(context.Binding.SendTimeout, context.Binding.CloseTimeout); } protected override IAsyncResult BeginTryReceiveItem(IDuplexChannel channel, AsyncCallback callback, object state) { return channel.BeginTryReceive(TimeSpan.MaxValue, callback, state); } protected override void DisposeItem(Message item) { ((IDisposable)item).Dispose(); } protected override void EndTryReceiveItem(IDuplexChannel channel, IAsyncResult result, out Message item) { channel.EndTryReceive(result, out item); } protected override Message GetMessage(Message item) { return item; } protected override void SendReply(Message reply, IDuplexChannel channel, Message item) { if (FaultHelper.AddressReply(item, reply)) channel.Send(reply); } } abstract class ReliableListenerOverReply : ReliableListenerOverDatagram where TChannel : class, IChannel where TReliableChannel : class, IChannel { protected ReliableListenerOverReply(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.FaultHelper = new ReplyFaultHelper(context.Binding.SendTimeout, context.Binding.CloseTimeout); } protected override IAsyncResult BeginTryReceiveItem(IReplyChannel channel, AsyncCallback callback, object state) { return channel.BeginTryReceiveRequest(TimeSpan.MaxValue, callback, state); } protected override void DisposeItem(RequestContext item) { ((IDisposable)item.RequestMessage).Dispose(); ((IDisposable)item).Dispose(); } protected override void EndTryReceiveItem(IReplyChannel channel, IAsyncResult result, out RequestContext item) { channel.EndTryReceiveRequest(result, out item); } protected override Message GetMessage(RequestContext item) { return item.RequestMessage; } protected override void SendReply(Message reply, IReplyChannel channel, RequestContext item) { if (FaultHelper.AddressReply(item.RequestMessage, reply)) item.Reply(reply); } } abstract class ReliableListenerOverSession : ReliableChannelListener where TChannel : class, IChannel where TReliableChannel : class, IChannel where TInnerChannel : class, IChannel, ISessionChannel where TInnerSession : ISession where TItem : IDisposable { WaitCallback asyncHandleReceiveComplete; AsyncCallback onReceiveComplete; protected ReliableListenerOverSession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.asyncHandleReceiveComplete = new WaitCallback(this.AsyncHandleReceiveComplete); this.onReceiveComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(this.OnReceiveComplete)); } void AsyncHandleReceiveComplete(object state) { try { IAsyncResult result = (IAsyncResult)state; TInnerChannel channel = (TInnerChannel)result.AsyncState; TItem item = default(TItem); try { this.EndTryReceiveItem(channel, result, out item); if (item == null) { channel.Close(); return; } } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); return; } } if (item != null) this.HandleReceiveComplete(item, channel); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } protected abstract IAsyncResult BeginTryReceiveItem(TInnerChannel channel, AsyncCallback callback, object state); protected abstract void DisposeItem(TItem item); protected abstract void EndTryReceiveItem(TInnerChannel channel, IAsyncResult result, out TItem item); protected abstract Message GetMessage(TItem item); void HandleReceiveComplete(TItem item, TInnerChannel channel) { WsrmMessageInfo info = WsrmMessageInfo.Get(this.MessageVersion, this.ReliableMessagingVersion, channel, channel.Session as ISecureConversationSession, this.GetMessage(item)); if (info.ParsingException != null) { this.DisposeItem(item); channel.Abort(); return; } TReliableChannel reliableChannel = null; bool dispatch = false; bool newChannel = false; Message faultReply = null; if (info.FaultReply != null) { faultReply = info.FaultReply; } else if (info.CreateSequenceInfo == null) { UniqueId id; reliableChannel = this.GetChannel(info, out id); if ((reliableChannel == null) && (id == null)) { this.DisposeItem(item); channel.Abort(); return; } if (reliableChannel == null) faultReply = new UnknownSequenceFault(id).CreateMessage(this.MessageVersion, this.ReliableMessagingVersion); } else { reliableChannel = this.ProcessCreateSequence(info, channel, out dispatch, out newChannel); if (reliableChannel == null) faultReply = info.FaultReply; } if (reliableChannel != null) { this.ProcessSequencedItem(channel, item, reliableChannel, info, newChannel); if (dispatch) this.Dispatch(); } else { try { this.SendReply(faultReply, channel, item); channel.Close(); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; channel.Abort(); } finally { faultReply.Close(); this.DisposeItem(item); } } } void OnReceiveComplete(IAsyncResult result) { if (!result.CompletedSynchronously) { try { TInnerChannel channel = (TInnerChannel)result.AsyncState; TItem item = default(TItem); try { this.EndTryReceiveItem(channel, result, out item); if (item == null) { channel.Close(); return; } } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; if (!this.HandleException(e, channel)) { channel.Abort(); return; } } if (item != null) this.HandleReceiveComplete(item, channel); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.Fault(e); } } } protected override void ProcessChannel(TInnerChannel channel) { try { IAsyncResult result = this.BeginTryReceiveItem(channel, this.onReceiveComplete, channel); if (result.CompletedSynchronously) IOThreadScheduler.ScheduleCallback(this.asyncHandleReceiveComplete, result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; channel.Abort(); return; } } protected abstract void ProcessSequencedItem(TInnerChannel channel, TItem item, TReliableChannel reliableChannel, WsrmMessageInfo info, bool newChannel); protected abstract void SendReply(Message reply, TInnerChannel channel, TItem item); } abstract class ReliableListenerOverDuplexSession : ReliableListenerOverSession where TChannel : class, IChannel where TReliableChannel : class, IChannel { protected ReliableListenerOverDuplexSession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.FaultHelper = new SendFaultHelper(context.Binding.SendTimeout, context.Binding.CloseTimeout); } protected override IAsyncResult BeginTryReceiveItem(IDuplexSessionChannel channel, AsyncCallback callback, object state) { return channel.BeginTryReceive(TimeSpan.MaxValue, callback, channel); } protected override void DisposeItem(Message item) { ((IDisposable)item).Dispose(); } protected override void EndTryReceiveItem(IDuplexSessionChannel channel, IAsyncResult result, out Message item) { channel.EndTryReceive(result, out item); } protected override Message GetMessage(Message item) { return item; } protected override void SendReply(Message reply, IDuplexSessionChannel channel, Message item) { if (FaultHelper.AddressReply(item, reply)) channel.Send(reply); } } abstract class ReliableListenerOverReplySession : ReliableListenerOverSession where TChannel : class, IChannel where TReliableChannel : class, IChannel { protected ReliableListenerOverReplySession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { this.FaultHelper = new ReplyFaultHelper(context.Binding.SendTimeout, context.Binding.CloseTimeout); } protected override IAsyncResult BeginTryReceiveItem(IReplySessionChannel channel, AsyncCallback callback, object state) { return channel.BeginTryReceiveRequest(TimeSpan.MaxValue, callback, channel); } protected override void DisposeItem(RequestContext item) { ((IDisposable)item.RequestMessage).Dispose(); ((IDisposable)item).Dispose(); } protected override void EndTryReceiveItem(IReplySessionChannel channel, IAsyncResult result, out RequestContext item) { channel.EndTryReceiveRequest(result, out item); } protected override Message GetMessage(RequestContext item) { return item.RequestMessage; } protected override void SendReply(Message reply, IReplySessionChannel channel, RequestContext item) { if (FaultHelper.AddressReply(item.RequestMessage, reply)) item.Reply(reply); } } class ReliableDuplexListenerOverDuplex : ReliableListenerOverDuplex { public ReliableDuplexListenerOverDuplex(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return true; } } protected override ServerReliableDuplexSessionChannel CreateChannel( UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ServerReliableDuplexSessionChannel(this, binder, this.FaultHelper, id, createSequenceInfo.OfferIdentifier); } protected override void ProcessSequencedItem(ServerReliableDuplexSessionChannel channel, Message message, WsrmMessageInfo info) { channel.ProcessDemuxedMessage(info); } } class ReliableInputListenerOverDuplex : ReliableListenerOverDuplex { public ReliableInputListenerOverDuplex(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return false; } } protected override ReliableInputSessionChannelOverDuplex CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableInputSessionChannelOverDuplex(this, binder, this.FaultHelper, id); } protected override void ProcessSequencedItem(ReliableInputSessionChannelOverDuplex channel, Message message, WsrmMessageInfo info) { channel.ProcessDemuxedMessage(info); } } class ReliableDuplexListenerOverDuplexSession : ReliableListenerOverDuplexSession { public ReliableDuplexListenerOverDuplexSession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return true; } } protected override ServerReliableDuplexSessionChannel CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ServerReliableDuplexSessionChannel(this, binder, this.FaultHelper, id, createSequenceInfo.OfferIdentifier); } protected override void ProcessSequencedItem(IDuplexSessionChannel channel, Message message, ServerReliableDuplexSessionChannel reliableChannel, WsrmMessageInfo info, bool newChannel) { if(!newChannel) { IServerReliableChannelBinder binder = (IServerReliableChannelBinder)reliableChannel.Binder; if(!binder.UseNewChannel(channel)) { message.Close(); channel.Abort(); return; } } reliableChannel.ProcessDemuxedMessage(info); } } class ReliableInputListenerOverDuplexSession : ReliableListenerOverDuplexSession { public ReliableInputListenerOverDuplexSession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return false; } } protected override ReliableInputSessionChannelOverDuplex CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableInputSessionChannelOverDuplex(this, binder, this.FaultHelper, id); } protected override void ProcessSequencedItem(IDuplexSessionChannel channel, Message message, ReliableInputSessionChannelOverDuplex reliableChannel, WsrmMessageInfo info, bool newChannel) { if (!newChannel) { IServerReliableChannelBinder binder = reliableChannel.Binder; if (!binder.UseNewChannel(channel)) { message.Close(); channel.Abort(); return; } } reliableChannel.ProcessDemuxedMessage(info); } } class ReliableInputListenerOverReply : ReliableListenerOverReply { public ReliableInputListenerOverReply(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return false; } } protected override ReliableInputSessionChannelOverReply CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableInputSessionChannelOverReply(this, binder, this.FaultHelper, id); } protected override void ProcessSequencedItem(ReliableInputSessionChannelOverReply reliableChannel, RequestContext context, WsrmMessageInfo info) { reliableChannel.ProcessDemuxedRequest(reliableChannel.Binder.WrapRequestContext(context), info); } } class ReliableReplyListenerOverReply : ReliableListenerOverReply { public ReliableReplyListenerOverReply(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return true; } } protected override ReliableReplySessionChannel CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableReplySessionChannel(this, binder, this.FaultHelper, id, createSequenceInfo.OfferIdentifier); } protected override void ProcessSequencedItem(ReliableReplySessionChannel reliableChannel, RequestContext context, WsrmMessageInfo info) { reliableChannel.ProcessDemuxedRequest(reliableChannel.Binder.WrapRequestContext(context), info); } } class ReliableInputListenerOverReplySession : ReliableListenerOverReplySession { public ReliableInputListenerOverReplySession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return false; } } protected override ReliableInputSessionChannelOverReply CreateChannel( UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableInputSessionChannelOverReply(this, binder, this.FaultHelper, id); } protected override void ProcessSequencedItem(IReplySessionChannel channel, RequestContext context, ReliableInputSessionChannelOverReply reliableChannel, WsrmMessageInfo info, bool newChannel) { if (!newChannel) { IServerReliableChannelBinder binder = reliableChannel.Binder; if (!binder.UseNewChannel(channel)) { context.RequestMessage.Close(); context.Abort(); channel.Abort(); return; } } reliableChannel.ProcessDemuxedRequest(reliableChannel.Binder.WrapRequestContext(context), info); } } class ReliableReplyListenerOverReplySession : ReliableListenerOverReplySession { public ReliableReplyListenerOverReplySession(ReliableSessionBindingElement binding, BindingContext context) : base(binding, context) { } protected override bool Duplex { get { return true; } } protected override ReliableReplySessionChannel CreateChannel(UniqueId id, CreateSequenceInfo createSequenceInfo, IServerReliableChannelBinder binder) { binder.Open(this.InternalOpenTimeout); return new ReliableReplySessionChannel(this, binder, this.FaultHelper, id, createSequenceInfo.OfferIdentifier); } protected override void ProcessSequencedItem(IReplySessionChannel channel, RequestContext context, ReliableReplySessionChannel reliableChannel, WsrmMessageInfo info, bool newChannel) { if (!newChannel) { IServerReliableChannelBinder binder = reliableChannel.Binder; if (!binder.UseNewChannel(channel)) { context.RequestMessage.Close(); context.Abort(); channel.Abort(); return; } } reliableChannel.ProcessDemuxedRequest(reliableChannel.Binder.WrapRequestContext(context), info); } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SamlNameIdentifierClaimResource.cs
- EntityTypeEmitter.cs
- ForceCopyBuildProvider.cs
- Perspective.cs
- GlyphCache.cs
- BitmapEncoder.cs
- _ProxyRegBlob.cs
- WindowsPen.cs
- SystemResourceKey.cs
- DataPagerFieldCommandEventArgs.cs
- DataReaderContainer.cs
- SystemIPv4InterfaceProperties.cs
- TemplateXamlTreeBuilder.cs
- OptimisticConcurrencyException.cs
- SmtpTransport.cs
- ResourceAttributes.cs
- ContentElement.cs
- CacheSection.cs
- SqlWriter.cs
- ProjectionPathSegment.cs
- ImageDrawing.cs
- SchemaTableOptionalColumn.cs
- XPathSingletonIterator.cs
- BuildProviderUtils.cs
- COM2PictureConverter.cs
- MexNamedPipeBindingElement.cs
- ImageConverter.cs
- InkCanvasSelection.cs
- XmlUnspecifiedAttribute.cs
- Keyboard.cs
- ParentQuery.cs
- SessionIDManager.cs
- ContourSegment.cs
- HiddenField.cs
- Polyline.cs
- FillRuleValidation.cs
- Line.cs
- EventItfInfo.cs
- TcpStreams.cs
- BamlLocalizer.cs
- BindingListCollectionView.cs
- MultiPropertyDescriptorGridEntry.cs
- SqlDataSourceView.cs
- Debug.cs
- DataObject.cs
- designeractionlistschangedeventargs.cs
- DbProviderServices.cs
- HelpKeywordAttribute.cs
- SelectorAutomationPeer.cs
- FontSourceCollection.cs
- ResXFileRef.cs
- TemplateBindingExtension.cs
- TraceInternal.cs
- ClientData.cs
- CustomErrorCollection.cs
- OdbcCommand.cs
- TypeConverterMarkupExtension.cs
- WindowsListView.cs
- SqlClientWrapperSmiStreamChars.cs
- FileEnumerator.cs
- smtppermission.cs
- DocComment.cs
- SByteStorage.cs
- StreamGeometryContext.cs
- UpdatableWrapper.cs
- ExtendedPropertyCollection.cs
- TimeSpanValidator.cs
- NoClickablePointException.cs
- WindowsListViewItemStartMenu.cs
- WindowsImpersonationContext.cs
- TypedLocationWrapper.cs
- MimeMultiPart.cs
- StackSpiller.Temps.cs
- EventlogProvider.cs
- StreamGeometry.cs
- Decimal.cs
- DataGridRelationshipRow.cs
- LinkTarget.cs
- DSASignatureFormatter.cs
- AppDomain.cs
- HttpRequestWrapper.cs
- DataGridCellsPanel.cs
- ValidatorCompatibilityHelper.cs
- XPathAncestorIterator.cs
- KeyTime.cs
- ComboBoxRenderer.cs
- DataGridViewColumnConverter.cs
- MarkupExtensionReturnTypeAttribute.cs
- TextPointerBase.cs
- ElementNotEnabledException.cs
- KerberosTicketHashIdentifierClause.cs
- CultureInfoConverter.cs
- XsdCachingReader.cs
- AttributeSetAction.cs
- ipaddressinformationcollection.cs
- KoreanLunisolarCalendar.cs
- AssemblyAttributesGoHere.cs
- XamlGridLengthSerializer.cs
- InkCanvasSelectionAdorner.cs
- ControlCodeDomSerializer.cs