Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / OneWayChannelListener.cs / 1 / OneWayChannelListener.cs
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel.Channels
{
using System.Collections.Generic;
using System.ServiceModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.Serialization;
using System.ServiceModel.Diagnostics;
using System.Text;
using System.Threading;
using System.ServiceModel.Description;
using System.Xml;
///
/// Wraps an IChannelListener into an IChannelListener
///
class ReplyOneWayChannelListener
: LayeredChannelListener
{
IChannelListener innerChannelListener;
bool packetRoutable;
public ReplyOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
: base(context.Binding, context.BuildInnerChannelListener())
{
this.packetRoutable = bindingElement.PacketRoutable;
}
protected override void OnOpening()
{
this.innerChannelListener = (IChannelListener)this.InnerChannelListener;
base.OnOpening();
}
protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
{
IReplyChannel innerChannel = this.innerChannelListener.AcceptChannel(timeout);
return WrapInnerChannel(innerChannel);
}
protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.innerChannelListener.BeginAcceptChannel(timeout, callback, state);
}
protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
{
IReplyChannel innerChannel = this.innerChannelListener.EndAcceptChannel(result);
return WrapInnerChannel(innerChannel);
}
protected override bool OnWaitForChannel(TimeSpan timeout)
{
return this.innerChannelListener.WaitForChannel(timeout);
}
protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.innerChannelListener.BeginWaitForChannel(timeout, callback, state);
}
protected override bool OnEndWaitForChannel(IAsyncResult result)
{
return this.innerChannelListener.EndWaitForChannel(result);
}
IInputChannel WrapInnerChannel(IReplyChannel innerChannel)
{
if (innerChannel == null)
{
return null;
}
else
{
return new ReplyOneWayInputChannel(this, innerChannel);
}
}
class ReplyOneWayInputChannel : LayeredChannel, IInputChannel
{
bool validateHeader;
public ReplyOneWayInputChannel(ReplyOneWayChannelListener listener, IReplyChannel innerChannel)
: base(listener, innerChannel)
{
this.validateHeader = listener.packetRoutable;
}
public EndpointAddress LocalAddress
{
get { return this.InnerChannel.LocalAddress; }
}
Message ProcessContext(RequestContext context, TimeSpan timeout)
{
if (context == null)
{
return null;
}
bool replySuccess = false;
Message result = null;
try
{
// validate that the request message contains our expected header
result = context.RequestMessage;
result.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
if (this.validateHeader)
{
PacketRoutableHeader.ValidateMessage(result);
}
try
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
context.Reply(null, timeoutHelper.RemainingTime());
replySuccess = true;
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
}
finally
{
if (!replySuccess)
{
context.Abort();
if (result != null)
{
result.Close();
result = null;
}
}
}
return result;
}
public Message Receive()
{
return this.Receive(this.DefaultReceiveTimeout);
}
public Message Receive(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
RequestContext context = InnerChannel.ReceiveRequest(timeoutHelper.RemainingTime());
return ProcessContext(context, timeoutHelper.RemainingTime());
}
public IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
}
public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return new ReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
}
public Message EndReceive(IAsyncResult result)
{
return ReceiveAsyncResult.End(result);
}
public bool TryReceive(TimeSpan timeout, out Message message)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
RequestContext context;
if (InnerChannel.TryReceiveRequest(timeoutHelper.RemainingTime(), out context))
{
message = ProcessContext(context, timeoutHelper.RemainingTime());
return true;
}
else
{
message = null;
return false;
}
}
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return new TryReceiveAsyncResult(this.InnerChannel, timeout, this.validateHeader, callback, state);
}
public bool EndTryReceive(IAsyncResult result, out Message message)
{
return TryReceiveAsyncResult.End(result, out message);
}
public bool WaitForMessage(TimeSpan timeout)
{
return InnerChannel.WaitForRequest(timeout);
}
public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
{
return InnerChannel.BeginWaitForRequest(timeout, callback, state);
}
public bool EndWaitForMessage(IAsyncResult result)
{
return InnerChannel.EndWaitForRequest(result);
}
class TryReceiveAsyncResult : ReceiveAsyncResultBase
{
bool tryResult;
public TryReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
AsyncCallback callback, object state)
: base(innerChannel, timeout, validateHeader, callback, state)
{
}
public static bool End(IAsyncResult result, out Message message)
{
TryReceiveAsyncResult thisPtr = AsyncResult.End(result);
message = thisPtr.Message;
return thisPtr.tryResult;
}
protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
{
return InnerChannel.BeginTryReceiveRequest(timeout, callback, state);
}
protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
{
RequestContext context;
this.tryResult = InnerChannel.EndTryReceiveRequest(result, out context);
return context;
}
}
class ReceiveAsyncResult : ReceiveAsyncResultBase
{
public ReceiveAsyncResult(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
AsyncCallback callback, object state)
: base(innerChannel, timeout, validateHeader, callback, state)
{
}
public static Message End(IAsyncResult result)
{
ReceiveAsyncResult thisPtr = AsyncResult.End(result);
return thisPtr.Message;
}
protected override IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
{
return InnerChannel.BeginReceiveRequest(timeout, callback, state);
}
protected override RequestContext OnEndReceiveRequest(IAsyncResult result)
{
return InnerChannel.EndReceiveRequest(result);
}
}
abstract class ReceiveAsyncResultBase : AsyncResult
{
IReplyChannel innerChannel;
RequestContext context;
Message message;
TimeoutHelper timeoutHelper;
bool validateHeader;
static AsyncCallback onReceiveRequest = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceiveRequest));
static AsyncCallback onReply = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReply));
protected ReceiveAsyncResultBase(IReplyChannel innerChannel, TimeSpan timeout, bool validateHeader,
AsyncCallback callback, object state)
: base(callback, state)
{
this.innerChannel = innerChannel;
this.timeoutHelper = new TimeoutHelper(timeout);
this.validateHeader = validateHeader;
IAsyncResult result = this.OnBeginReceiveRequest(timeoutHelper.RemainingTime(), onReceiveRequest, this);
if (!result.CompletedSynchronously)
{
return;
}
if (HandleReceiveRequestComplete(result))
{
base.Complete(true);
}
}
protected IReplyChannel InnerChannel
{
get
{
return this.innerChannel;
}
}
protected Message Message
{
get
{
return this.message;
}
}
protected abstract IAsyncResult OnBeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
protected abstract RequestContext OnEndReceiveRequest(IAsyncResult result);
bool HandleReplyComplete(IAsyncResult result)
{
bool abortContext = true;
try
{
context.EndReply(result);
abortContext = false;
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
finally
{
if (abortContext)
{
context.Abort();
}
}
return true;
}
bool HandleReceiveRequestComplete(IAsyncResult result)
{
this.context = this.OnEndReceiveRequest(result);
if (this.context == null)
{
return true;
}
bool replySuccess = false;
IAsyncResult replyResult = null;
try
{
this.message = context.RequestMessage;
this.message.Properties.Add(RequestContextMessageProperty.Name, new RequestContextMessageProperty(context));
if (validateHeader)
{
PacketRoutableHeader.ValidateMessage(this.message);
}
try
{
replyResult = context.BeginReply(null, timeoutHelper.RemainingTime(), onReply, this);
replySuccess = true;
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
}
finally
{
if (!replySuccess)
{
this.context.Abort();
if (this.message != null)
{
this.message.Close();
this.message = null;
}
}
}
if (replyResult == null)
{
return true;
}
else if (replyResult.CompletedSynchronously)
{
return HandleReplyComplete(replyResult);
}
else
{
return false;
}
}
static void OnReceiveRequest(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
Exception completionException = null;
bool completeSelf;
try
{
completeSelf = thisPtr.HandleReceiveRequestComplete(result);
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
static void OnReply(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
ReceiveAsyncResultBase thisPtr = (ReceiveAsyncResultBase)result.AsyncState;
Exception completionException = null;
bool completeSelf;
try
{
completeSelf = thisPtr.HandleReplyComplete(result);
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
}
}
}
///
/// Wraps an IChannelListener into an IChannelListener
///
class DuplexSessionOneWayChannelListener
: DelegatingChannelListener
{
IChannelListener innerChannelListener;
DuplexSessionOneWayInputChannelAcceptor inputChannelAcceptor;
bool packetRoutable;
int maxAcceptedChannels;
bool acceptPending;
int activeChannels;
TimeSpan idleTimeout;
static AsyncCallback onAcceptInnerChannel = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnAcceptInnerChannel));
AsyncCallback onOpenInnerChannel;
EventHandler onInnerChannelClosed;
ItemDequeuedCallback onExceptionDequeued;
WaitCallback handleAcceptCallback;
bool ownsInnerListener;
object acceptLock;
public DuplexSessionOneWayChannelListener(OneWayBindingElement bindingElement, BindingContext context)
: base(true, context.Binding, context.BuildInnerChannelListener())
{
this.acceptLock = new object();
this.inputChannelAcceptor = new DuplexSessionOneWayInputChannelAcceptor(this);
this.packetRoutable = bindingElement.PacketRoutable;
this.maxAcceptedChannels = bindingElement.MaxAcceptedChannels;
this.Acceptor = this.inputChannelAcceptor;
this.idleTimeout = bindingElement.ChannelPoolSettings.IdleTimeout;
this.onOpenInnerChannel = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnOpenInnerChannel));
this.ownsInnerListener = true;
this.onInnerChannelClosed = new EventHandler(OnInnerChannelClosed);
}
bool IsAcceptNecessary
{
get
{
return !acceptPending
&& (activeChannels < maxAcceptedChannels)
&& (this.innerChannelListener.State == CommunicationState.Opened);
}
}
protected override void OnOpening()
{
this.innerChannelListener = (IChannelListener)this.InnerChannelListener;
this.inputChannelAcceptor.TransferInnerChannelListener(this.innerChannelListener); // acceptor now owns the lifetime
this.ownsInnerListener = false;
base.OnOpening();
}
protected override void OnOpened()
{
base.OnOpened();
IOThreadScheduler.ScheduleCallback(new WaitCallback(AcceptLoop), null);
}
protected override void OnAbort()
{
base.OnAbort();
if (this.ownsInnerListener && this.innerChannelListener != null) // Open didn't complete
{
this.innerChannelListener.Abort();
}
}
void AcceptLoop(object state)
{
AcceptLoop(null);
}
// we need to kick off an accept (and possibly process a completion as well)
void AcceptLoop(IAsyncResult pendingResult)
{
IDuplexSessionChannel pendingChannel = null;
if (pendingResult != null)
{
if (!ProcessEndAccept(pendingResult, out pendingChannel))
{
return;
}
pendingResult = null;
}
lock (acceptLock)
{
while (IsAcceptNecessary)
{
Exception exceptionToEnqueue = null;
try
{
IAsyncResult result = null;
try
{
result = this.innerChannelListener.BeginAcceptChannel(TimeSpan.MaxValue, onAcceptInnerChannel, this);
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
continue;
}
acceptPending = true;
if (!result.CompletedSynchronously)
{
break;
}
if (this.handleAcceptCallback == null)
{
this.handleAcceptCallback = new WaitCallback(HandleAcceptCallback);
}
if (pendingChannel != null)
{
// don't starve our completed Accept
IOThreadScheduler.ScheduleCallback(handleAcceptCallback, pendingChannel);
pendingChannel = null;
}
IDuplexSessionChannel channel = null;
if (ProcessEndAccept(result, out channel))
{
if (channel != null)
{
IOThreadScheduler.ScheduleCallback(handleAcceptCallback, channel);
}
}
else
{
return;
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
if (exceptionToEnqueue != null)
{
this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null, false);
}
}
}
if (pendingChannel != null)
{
HandleAcceptComplete(pendingChannel);
}
}
// return true if the loop should continue
bool ProcessEndAccept(IAsyncResult result, out IDuplexSessionChannel channel)
{
channel = null;
Exception exceptionToEnqueue = null;
bool success = false;
try
{
channel = innerChannelListener.EndAcceptChannel(result);
success = true;
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
if (success)
{
if (channel != null)
{
channel.Closed += this.onInnerChannelClosed;
bool traceMaxInboundChannels = false;
lock (acceptLock)
{
this.acceptPending = false;
activeChannels++;
if (activeChannels >= maxAcceptedChannels)
{
traceMaxInboundChannels = true;
}
}
if (DiagnosticUtility.ShouldTraceWarning)
{
if (traceMaxInboundChannels)
{
TraceUtility.TraceEvent(TraceEventType.Warning,
TraceCode.MaxAcceptedChannelsReached,
new StringTraceRecord("MaxAcceptedChannels", maxAcceptedChannels.ToString(System.Globalization.CultureInfo.InvariantCulture)),
this,
null);
}
}
}
else
{
// we're at EOF. close up the Acceptor and break out of our loop
this.inputChannelAcceptor.Close();
return false;
}
}
else if (exceptionToEnqueue != null)
{
// see what the state of the inner listener is. If it's still open, don't block the accept loop
bool canDispatchOnThisThread = (innerChannelListener.State != CommunicationState.Opened);
if (this.onExceptionDequeued == null)
{
this.onExceptionDequeued = new ItemDequeuedCallback(OnExceptionDequeued);
}
this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, this.onExceptionDequeued, canDispatchOnThisThread);
}
else
{
lock (acceptLock)
{
this.acceptPending = false;
}
}
return true;
}
void OnExceptionDequeued()
{
lock (acceptLock)
{
this.acceptPending = false;
}
AcceptLoop(null);
}
static void OnAcceptInnerChannel(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
DuplexSessionOneWayChannelListener thisPtr = (DuplexSessionOneWayChannelListener)result.AsyncState;
thisPtr.AcceptLoop(result);
}
void HandleAcceptCallback(object state)
{
this.HandleAcceptComplete((IDuplexSessionChannel)state);
}
void OnInnerChannelClosed(object sender, EventArgs e)
{
// Reduce our quota and kick off an accept
IDuplexSessionChannel channel = (IDuplexSessionChannel)sender;
channel.Closed -= this.onInnerChannelClosed;
lock (acceptLock)
{
activeChannels--;
}
this.AcceptLoop(null);
}
void HandleAcceptComplete(IDuplexSessionChannel channel)
{
Exception exceptionToEnqueue = null;
bool success = false;
this.inputChannelAcceptor.PrepareChannel(channel);
IAsyncResult openResult = null;
try
{
openResult = channel.BeginOpen(this.idleTimeout, onOpenInnerChannel, channel);
success = true;
}
catch (CommunicationException e) // consume CommunicationException
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
finally
{
if (!success && channel != null)
{
channel.Abort();
}
}
if (success)
{
if (openResult.CompletedSynchronously)
{
CompleteOpen(channel, openResult);
}
}
else
{
if (exceptionToEnqueue != null)
{
this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
}
}
}
void OnOpenInnerChannel(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
IDuplexSessionChannel channel = (IDuplexSessionChannel)result.AsyncState;
CompleteOpen(channel, result);
}
// open channel and start receiving messages
void CompleteOpen(IDuplexSessionChannel channel, IAsyncResult result)
{
Exception exceptionToEnqueue = null;
bool success = false;
try
{
channel.EndOpen(result);
success = true;
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
finally
{
if (!success)
{
channel.Abort();
}
}
if (success)
{
this.inputChannelAcceptor.AcceptInnerChannel(this, channel);
}
else if (exceptionToEnqueue != null)
{
this.inputChannelAcceptor.Enqueue(exceptionToEnqueue, null);
}
}
class DuplexSessionOneWayInputChannelAcceptor : InputChannelAcceptor
{
ChannelTracker receivers;
IChannelListener innerChannelListener;
public DuplexSessionOneWayInputChannelAcceptor(DuplexSessionOneWayChannelListener listener)
: base(listener)
{
this.receivers = new ChannelTracker();
}
public void TransferInnerChannelListener(IChannelListener innerChannelListener)
{
DiagnosticUtility.DebugAssert(this.innerChannelListener == null, "innerChannelListener must be null prior to transfer");
bool abortListener = false;
lock (ThisLock)
{
this.innerChannelListener = innerChannelListener;
if (this.State == CommunicationState.Closing || this.State == CommunicationState.Closed)
{
// abort happened before we completed the transfer
abortListener = true;
}
}
if (abortListener)
{
innerChannelListener.Abort();
}
}
public void AcceptInnerChannel(DuplexSessionOneWayChannelListener listener, IDuplexSessionChannel channel)
{
ChannelReceiver channelReceiver = new ChannelReceiver(listener, channel);
this.receivers.Add(channel, channelReceiver);
channelReceiver.StartReceiving();
}
public void PrepareChannel(IDuplexSessionChannel channel)
{
this.receivers.PrepareChannel(channel);
}
protected override InputChannel OnCreateChannel()
{
return new DuplexSessionOneWayInputChannel(this.ChannelManager, null);
}
protected override void OnOpen(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
base.OnOpen(timeoutHelper.RemainingTime());
this.receivers.Open(timeoutHelper.RemainingTime());
this.innerChannelListener.Open(timeoutHelper.RemainingTime());
}
protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
{
return new ChainedOpenAsyncResult(timeout, callback, state, base.OnBeginOpen, base.OnEndOpen, this.receivers, this.innerChannelListener);
}
protected override void OnEndOpen(IAsyncResult result)
{
ChainedOpenAsyncResult.End(result);
}
protected override void OnAbort()
{
base.OnAbort();
if (!TransferReceivers())
{
this.receivers.Abort();
if (this.innerChannelListener != null)
{
this.innerChannelListener.Abort();
}
}
}
protected override void OnClose(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
base.OnClose(timeoutHelper.RemainingTime());
if (!TransferReceivers())
{
this.receivers.Close(timeoutHelper.RemainingTime());
this.innerChannelListener.Close(timeoutHelper.RemainingTime());
}
}
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
{
List objectsToClose = new List();
if (!TransferReceivers())
{
objectsToClose.Add(this.receivers);
objectsToClose.Add(this.innerChannelListener);
}
return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
}
protected override void OnEndClose(IAsyncResult result)
{
ChainedCloseAsyncResult.End(result);
}
// used to decouple our channel and listener lifetimes
bool TransferReceivers()
{
DuplexSessionOneWayInputChannel singletonChannel = (DuplexSessionOneWayInputChannel)base.GetCurrentChannel();
if (singletonChannel == null)
{
return false;
}
else
{
return singletonChannel.TransferReceivers(this.receivers, this.innerChannelListener);
}
}
class DuplexSessionOneWayInputChannel : InputChannel
{
ChannelTracker receivers;
IChannelListener innerChannelListener;
public DuplexSessionOneWayInputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
: base(channelManager, localAddress)
{
}
public bool TransferReceivers(ChannelTracker receivers,
IChannelListener innerChannelListener)
{
lock (ThisLock)
{
if (this.State != CommunicationState.Opened)
{
return false;
}
this.receivers = receivers;
this.innerChannelListener = innerChannelListener;
return true;
}
}
protected override void OnAbort()
{
if (this.receivers != null)
{
DiagnosticUtility.DebugAssert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
this.receivers.Abort();
this.innerChannelListener.Abort();
}
base.OnAbort();
}
protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
{
List objectsToClose = new List();
if (this.receivers != null)
{
objectsToClose.Add(this.receivers);
objectsToClose.Add(this.innerChannelListener);
}
return new ChainedCloseAsyncResult(timeout, callback, state, base.OnBeginClose, base.OnEndClose, objectsToClose);
}
protected override void OnEndClose(IAsyncResult result)
{
ChainedCloseAsyncResult.End(result);
}
protected override void OnClose(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
if (this.receivers != null)
{
DiagnosticUtility.DebugAssert(this.innerChannelListener != null, "innerChannelListener and receiver should both be null or non-null");
this.receivers.Close(timeoutHelper.RemainingTime());
this.innerChannelListener.Close(timeoutHelper.RemainingTime());
}
base.OnClose(timeoutHelper.RemainingTime());
}
}
}
// given an inner channel, pulls messages off of it and enqueues them into the upper channel
class ChannelReceiver
{
ItemDequeuedCallback onMessageDequeued;
static AsyncCallback onReceive = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnReceive));
DuplexSessionOneWayInputChannelAcceptor acceptor;
IDuplexSessionChannel channel;
TimeSpan idleTimeout;
static WaitCallback startReceivingCallback;
WaitCallback onStartReceiveLater;
WaitCallback onDispatchItemsLater;
bool validateHeader;
public ChannelReceiver(DuplexSessionOneWayChannelListener parent, IDuplexSessionChannel channel)
{
this.channel = channel;
this.acceptor = parent.inputChannelAcceptor;
this.idleTimeout = parent.idleTimeout;
this.validateHeader = parent.packetRoutable;
this.onMessageDequeued = new ItemDequeuedCallback(OnMessageDequeued);
}
void StartReceivingCallback(object state)
{
((ChannelReceiver)state).StartReceiving();
}
public void StartReceiving()
{
Exception exceptionToEnqueue = null;
while (true)
{
if (channel.State != CommunicationState.Opened)
{
channel.Abort();
break;
}
IAsyncResult result = null;
try
{
result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
break;
}
if (result != null)
{
if (!result.CompletedSynchronously)
{
break;
}
bool dispatch;
bool continueLoop = OnCompleteReceive(result, out dispatch);
if (dispatch)
{
Dispatch();
}
if (!continueLoop)
{
break;
}
}
}
if (exceptionToEnqueue != null)
{
this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued);
}
}
bool EnqueueMessage(Message message)
{
if (this.validateHeader)
{
if (!PacketRoutableHeader.TryValidateMessage(message))
{
this.channel.Abort();
message.Close();
return false;
}
else
{
this.validateHeader = false; // only validate the first message on a session
}
}
return this.acceptor.EnqueueWithoutDispatch(message, this.onMessageDequeued);
}
void OnStartReceiveLater(object state)
{
StartReceiving();
}
void OnDispatchItemsLater(object state)
{
Dispatch();
}
void Dispatch()
{
this.acceptor.DispatchItems();
}
// returns true if the Receive Loop should continue (or be started if it's not running)
bool OnCompleteReceive(IAsyncResult result, out bool dispatchLater)
{
Exception exceptionToEnqueue = null;
Message message = null;
bool startLoop = false;
dispatchLater = false;
try
{
if (!this.channel.EndTryReceive(result, out message))
{
this.channel.Abort(); // we've hit our IdleTimeout
}
else if (message == null)
{
this.channel.Close(); // read EOF, close our half of the session
}
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
startLoop = (this.channel.State == CommunicationState.Opened);
}
catch (TimeoutException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
startLoop = (this.channel.State == CommunicationState.Opened);
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
if (message != null)
{
dispatchLater = EnqueueMessage(message);
}
else if (exceptionToEnqueue != null)
{
dispatchLater = this.acceptor.EnqueueWithoutDispatch(exceptionToEnqueue, this.onMessageDequeued);
}
return startLoop;
}
void OnMessageDequeued()
{
IAsyncResult result = null;
Exception exceptionToEnqueue = null;
try
{
result = this.channel.BeginTryReceive(this.idleTimeout, onReceive, this);
}
catch (CommunicationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
}
#pragma warning suppress 56500 // [....], transferring exception to input queue to be pulled off by user
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
exceptionToEnqueue = e;
}
if (result != null)
{
if (result.CompletedSynchronously)
{
bool dispatchLater;
if (OnCompleteReceive(result, out dispatchLater))
{
if (onStartReceiveLater == null)
{
onStartReceiveLater = new WaitCallback(OnStartReceiveLater);
}
IOThreadScheduler.ScheduleCallback(onStartReceiveLater, null);
}
if (dispatchLater)
{
if (onDispatchItemsLater == null)
{
onDispatchItemsLater = new WaitCallback(OnDispatchItemsLater);
}
IOThreadScheduler.ScheduleCallback(onDispatchItemsLater, null);
}
}
}
else if (exceptionToEnqueue != null)
{
this.acceptor.Enqueue(exceptionToEnqueue, this.onMessageDequeued, false);
}
else // need to kickoff a new loop
{
if (this.channel.State == CommunicationState.Opened)
{
if (startReceivingCallback == null)
{
startReceivingCallback = new WaitCallback(StartReceivingCallback);
}
IOThreadScheduler.ScheduleCallback(startReceivingCallback, this);
}
}
}
static void OnReceive(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
ChannelReceiver thisPtr = (ChannelReceiver)result.AsyncState;
bool dispatch;
if (thisPtr.OnCompleteReceive(result, out dispatch))
{
thisPtr.StartReceiving();
}
if (dispatch)
{
thisPtr.Dispatch();
}
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Translator.cs
- CatalogPartCollection.cs
- XmlSchemaImporter.cs
- NativeMethods.cs
- PropertyPathWorker.cs
- OuterGlowBitmapEffect.cs
- SqlParameterCollection.cs
- ClearTypeHintValidation.cs
- KeyProperty.cs
- TimeSpanSecondsConverter.cs
- MethodAccessException.cs
- SectionXmlInfo.cs
- NamedPipeDuplicateContext.cs
- FontConverter.cs
- ConditionCollection.cs
- WindowsNonControl.cs
- XmlReader.cs
- NumberFormatInfo.cs
- WmfPlaceableFileHeader.cs
- DeliveryStrategy.cs
- ControlBuilder.cs
- WebPartConnectionsConfigureVerb.cs
- InternalsVisibleToAttribute.cs
- DiscoveryViaBehavior.cs
- AliasedSlot.cs
- Cursor.cs
- RSAOAEPKeyExchangeDeformatter.cs
- ParagraphVisual.cs
- PasswordTextContainer.cs
- TableItemPatternIdentifiers.cs
- ImageBrush.cs
- IxmlLineInfo.cs
- WebPartPersonalization.cs
- PropertySegmentSerializer.cs
- MergeFilterQuery.cs
- WeakHashtable.cs
- XmlSchemaAttributeGroupRef.cs
- TextTreeTextBlock.cs
- DataStorage.cs
- ControlParameter.cs
- WebPartVerbsEventArgs.cs
- InkCanvasSelection.cs
- WsatConfiguration.cs
- RuntimeVariableList.cs
- AddInContractAttribute.cs
- ElementNotAvailableException.cs
- ViewStateException.cs
- DataObject.cs
- WebPartTransformer.cs
- GacUtil.cs
- GridViewDeletedEventArgs.cs
- TrailingSpaceComparer.cs
- HashAlgorithm.cs
- AttributeEmitter.cs
- FileDialog_Vista_Interop.cs
- XmlNamedNodeMap.cs
- FlowLayout.cs
- ViewBox.cs
- DependencyPropertyHelper.cs
- TextLine.cs
- SystemIPAddressInformation.cs
- ColorIndependentAnimationStorage.cs
- NodeCounter.cs
- MsmqAppDomainProtocolHandler.cs
- BaseProcessProtocolHandler.cs
- IconConverter.cs
- PageWrapper.cs
- TreeIterator.cs
- IArgumentProvider.cs
- SimpleTextLine.cs
- RuntimeConfig.cs
- ExpressionServices.cs
- SelectionGlyph.cs
- DataGridViewCellCollection.cs
- DoubleAnimation.cs
- DataGridPageChangedEventArgs.cs
- _MultipleConnectAsync.cs
- DataGridViewBindingCompleteEventArgs.cs
- BuildProvider.cs
- ValueOfAction.cs
- PropertyDescriptor.cs
- WebServicesDescriptionAttribute.cs
- ServicePointManagerElement.cs
- FileSystemEventArgs.cs
- DispatcherProcessingDisabled.cs
- ObjectViewQueryResultData.cs
- OleDbConnection.cs
- BackStopAuthenticationModule.cs
- ReadOnlyHierarchicalDataSource.cs
- RangeEnumerable.cs
- FormViewPagerRow.cs
- CultureTableRecord.cs
- Typography.cs
- StrokeRenderer.cs
- dbenumerator.cs
- IItemContainerGenerator.cs
- OleDbPropertySetGuid.cs
- WorkerRequest.cs
- ResolveMatchesCD1.cs
- HtmlAnchor.cs