Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Dispatcher / DuplexChannelBinder.cs / 1 / DuplexChannelBinder.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Dispatcher { using System; using System.ServiceModel; using System.ServiceModel.Security; using System.Collections; using System.Collections.Generic; using System.Threading; using System.ServiceModel.Channels; using System.Xml; using System.ServiceModel.Diagnostics; using System.Runtime.CompilerServices; class DuplexChannelBinder : IChannelBinder { IDuplexChannel channel; IRequestReplyCorrelator correlator; TimeSpan defaultCloseTimeout; TimeSpan defaultSendTimeout; bool faulted = false; IdentityVerifier identityVerifier; bool isSession; Uri listenUri; int pending; bool syncPumpEnabled; Listrequests; ChannelHandler channelHandler; internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator) : this(channel, correlator, null) { // empty } internal DuplexChannelBinder(IDuplexChannel channel, IRequestReplyCorrelator correlator, Uri listenUri) { if (!((channel != null))) { DiagnosticUtility.DebugAssert("DuplexChannelBinder.DuplexChannelBinder: (channel != null)"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("channel"); } if (!((correlator != null))) { DiagnosticUtility.DebugAssert("DuplexChannelBinder.DuplexChannelBinder: (correlator != null)"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("correlator"); } this.channel = channel; this.correlator = correlator; this.listenUri = listenUri; this.channel.Faulted += new EventHandler(OnFaulted); } internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator) : this(channel, correlator, null) { // empty } internal DuplexChannelBinder(IDuplexSessionChannel channel, IRequestReplyCorrelator correlator, Uri listenUri) : this((IDuplexChannel)channel, correlator, listenUri) { this.isSession = true; } public IChannel Channel { get { return this.channel; } } public TimeSpan DefaultCloseTimeout { get { return this.defaultCloseTimeout; } set { this.defaultCloseTimeout = value; } } internal ChannelHandler ChannelHandler { get { if (!(this.channelHandler != null)) { DiagnosticUtility.DebugAssert("DuplexChannelBinder.ChannelHandler: (channelHandler != null)"); } return this.channelHandler; } set { if (!(this.channelHandler == null)) { DiagnosticUtility.DebugAssert("DuplexChannelBinder.ChannelHandler: (channelHandler == null)"); } this.channelHandler = value; } } public TimeSpan DefaultSendTimeout { get { return this.defaultSendTimeout; } set { this.defaultSendTimeout = value; } } public bool HasSession { get { return this.isSession; } } internal IdentityVerifier IdentityVerifier { get { if (this.identityVerifier == null) { this.identityVerifier = IdentityVerifier.CreateDefault(); } return this.identityVerifier; } set { if (value == null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value"); } this.identityVerifier = value; } } internal bool IsFaulted { get { return this.faulted; } } public Uri ListenUri { get { return this.listenUri; } } public EndpointAddress LocalAddress { get { return this.channel.LocalAddress; } } bool Pumping { get { if (this.syncPumpEnabled) return true; if (this.channelHandler != null && this.channelHandler.HasRegisterBeenCalled) return true; return false; } } public EndpointAddress RemoteAddress { get { return this.channel.RemoteAddress; } } List Requests { get { lock (this.ThisLock) { if (this.requests == null) this.requests = new List (); return this.requests; } } } object ThisLock { get { return this; } } void OnFaulted(object sender, EventArgs e) { //Some unhandled exception happened on the channel. //So close all pending requests so the callbacks (in case of async) //on the requests are called. this.AbortRequests(); } public void Abort() { this.channel.Abort(); this.AbortRequests(); } public void CloseAfterFault(TimeSpan timeout) { this.channel.Close(timeout); this.AbortRequests(); } void AbortRequests() { lock (this.ThisLock) { if (this.requests != null) { IDuplexRequest[] array = this.requests.ToArray(); foreach (IDuplexRequest request in array) request.Abort(); } this.requests = null; } } TimeoutException GetReceiveTimeoutException(TimeSpan timeout) { EndpointAddress address = this.channel.RemoteAddress ?? this.channel.LocalAddress; if (address != null) { return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut2, address, timeout)); } else { return new TimeoutException(SR.GetString(SR.SFxRequestTimedOut1, timeout)); } } internal bool HandleRequestAsReply(Message message) { UniqueId relatesTo = null; try { relatesTo = message.Headers.RelatesTo; } catch (MessageHeaderException) { // ignore it } if (relatesTo == null) { return false; } else { return HandleRequestAsReplyCore(message); } } bool HandleRequestAsReplyCore(Message message) { IDuplexRequest request = correlator.Find (message, true); if (request != null) { request.GotReply(message); return true; } return false; } public void EnsurePumping() { lock (this.ThisLock) { if (!this.syncPumpEnabled) { if (!this.ChannelHandler.HasRegisterBeenCalled) ChannelHandler.Register(this.ChannelHandler); } } } public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { if (this.channel.State == CommunicationState.Faulted) return new ChannelFaultedAsyncResult(callback, state); return this.channel.BeginTryReceive(timeout, callback, state); } public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext) { ChannelFaultedAsyncResult channelFaultedResult = result as ChannelFaultedAsyncResult; if (channelFaultedResult != null) { this.AbortRequests(); requestContext = null; return true; } Message message; if (this.channel.EndTryReceive(result, out message)) { if (message != null) { requestContext = new DuplexRequestContext(this.channel, message, this); } else { this.AbortRequests(); requestContext = null; } return true; } else { requestContext = null; return false; } } public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return this.channel.BeginSend(message, timeout, callback, state); } public void EndSend(IAsyncResult result) { this.channel.EndSend(result); } public void Send(Message message, TimeSpan timeout) { this.channel.Send(message, timeout); } public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) { bool success = false; AsyncDuplexRequest duplexRequest = null; try { RequestReplyCorrelator.PrepareRequest(message); duplexRequest = new AsyncDuplexRequest(message, this, timeout, callback, state); lock (this.ThisLock) { this.RequestStarting(message, duplexRequest); } IAsyncResult result = this.channel.BeginSend(message, timeout, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(this.SendCallback)), duplexRequest); if (result.CompletedSynchronously) duplexRequest.FinishedSend(result, true); EnsurePumping(); success = true; return duplexRequest; } finally { lock (this.ThisLock) { if (success) { duplexRequest.EnableCompletion(); } else { this.RequestCompleting(duplexRequest); } } } } public Message EndRequest(IAsyncResult result) { AsyncDuplexRequest duplexRequest = result as AsyncDuplexRequest; if (duplexRequest == null) throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentException(SR.GetString(SR.InvalidAsyncResult))); return duplexRequest.End(); } public bool TryReceive(TimeSpan timeout, out RequestContext requestContext) { if (this.channel.State == CommunicationState.Faulted) { this.AbortRequests(); requestContext = null; return true; } Message message; if (this.channel.TryReceive(timeout, out message)) { if (message != null) { requestContext = new DuplexRequestContext(this.channel, message, this); } else { this.AbortRequests(); requestContext = null; } return true; } else { requestContext = null; return false; } } public Message Request(Message message, TimeSpan timeout) { SyncDuplexRequest duplexRequest = null; bool optimized = false; RequestReplyCorrelator.PrepareRequest(message); lock (this.ThisLock) { if (!Pumping) { optimized = true; syncPumpEnabled = true; } if (!optimized) duplexRequest = new SyncDuplexRequest(this); this.RequestStarting(message, duplexRequest); } if (optimized) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); UniqueId messageId = message.Headers.MessageId; try { this.channel.Send(message, timeoutHelper.RemainingTime()); if (DiagnosticUtility.ShouldUseActivity && ServiceModelActivity.Current != null && ServiceModelActivity.Current.ActivityType == ActivityType.ProcessAction) { ServiceModelActivity.Current.Suspend(); } for (;;) { TimeSpan remaining = timeoutHelper.RemainingTime(); Message reply; if (!this.channel.TryReceive(remaining, out reply)) { throw TraceUtility.ThrowHelperError(this.GetReceiveTimeoutException(timeout), message); } if (reply == null) { this.AbortRequests(); return null; } if (reply.Headers.RelatesTo == messageId) { this.ThrowIfInvalidReplyIdentity(reply); return reply; } else if (!this.HandleRequestAsReply(reply)) { // SFx drops a message here if (DiagnosticUtility.ShouldTraceInformation) { EndpointDispatcher dispatcher = null; if (this.ChannelHandler != null && this.ChannelHandler.Channel != null) { dispatcher = this.ChannelHandler.Channel.EndpointDispatcher; } TraceUtility.TraceDroppedMessage(reply, dispatcher); } reply.Close(); } } } finally { lock (this.ThisLock) { this.RequestCompleting(null); syncPumpEnabled = false; if (this.pending > 0) EnsurePumping(); } } } else { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); this.channel.Send(message, timeoutHelper.RemainingTime()); EnsurePumping(); return duplexRequest.WaitForReply(timeoutHelper.RemainingTime()); } } // ASSUMPTION: ([....]) caller holds lock (this.mutex) void RequestStarting(Message message, IDuplexRequest request) { if (request != null) { this.Requests.Add(request); this.correlator.Add (message, request); } this.pending++; } // ASSUMPTION: ([....]) caller holds lock (this.mutex) void RequestCompleting(IDuplexRequest request) { this.pending--; if (this.pending == 0) { this.requests = null; } else if ((request != null) && (this.requests != null)) { this.requests.Remove(request); } } void SendCallback(IAsyncResult result) { AsyncDuplexRequest duplexRequest = result.AsyncState as AsyncDuplexRequest; if (!((duplexRequest != null))) { DiagnosticUtility.DebugAssert("DuplexChannelBinder.RequestCallback: (duplexRequest != null)"); } if (!result.CompletedSynchronously) duplexRequest.FinishedSend(result, false); } [MethodImpl(MethodImplOptions.NoInlining)] void EnsureIncomingIdentity(SecurityMessageProperty property, EndpointAddress address, Message reply) { this.IdentityVerifier.EnsureIncomingIdentity(address, property.ServiceSecurityContext.AuthorizationContext); } void ThrowIfInvalidReplyIdentity(Message reply) { if (!this.isSession) { SecurityMessageProperty property = reply.Properties.Security; EndpointAddress address = this.channel.RemoteAddress; if ((property != null) && (address != null)) { EnsureIncomingIdentity(property, address, reply); } } } public bool WaitForMessage(TimeSpan timeout) { return this.channel.WaitForMessage(timeout); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return this.channel.BeginWaitForMessage(timeout, callback, state); } public bool EndWaitForMessage(IAsyncResult result) { return this.channel.EndWaitForMessage(result); } class DuplexRequestContext : RequestContextBase { DuplexChannelBinder binder; IDuplexChannel channel; internal DuplexRequestContext(IDuplexChannel channel, Message request, DuplexChannelBinder binder) : base(request, binder.DefaultCloseTimeout, binder.DefaultSendTimeout) { this.channel = channel; this.binder = binder; } protected override void OnAbort() { } protected override void OnClose(TimeSpan timeout) { } protected override void OnReply(Message message, TimeSpan timeout) { if (message != null) { this.channel.Send(message, timeout); } } protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return new ReplyAsyncResult(this, message, timeout, callback, state); } protected override void OnEndReply(IAsyncResult result) { ReplyAsyncResult.End(result); } class ReplyAsyncResult : AsyncResult { static AsyncCallback onSend; DuplexRequestContext context; public ReplyAsyncResult(DuplexRequestContext context, Message message, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { if (message != null) { if (onSend == null) { onSend = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnSend)); } this.context = context; IAsyncResult result = context.channel.BeginSend(message, timeout, onSend, this); if (!result.CompletedSynchronously) { return; } context.channel.EndSend(result); } base.Complete(true); } public static void End(IAsyncResult result) { AsyncResult.End (result); } static void OnSend(IAsyncResult result) { if (result.CompletedSynchronously) { return; } Exception completionException = null; ReplyAsyncResult thisPtr = (ReplyAsyncResult)result.AsyncState; try { thisPtr.context.channel.EndSend(result); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } completionException = exception; } thisPtr.Complete(false, completionException); } } } interface IDuplexRequest { void Abort(); void GotReply(Message reply); } class SyncDuplexRequest : IDuplexRequest { Message reply; DuplexChannelBinder parent; ManualResetEvent wait = new ManualResetEvent(false); int waitCount = 0; internal SyncDuplexRequest(DuplexChannelBinder parent) { this.parent = parent; } public void Abort() { this.wait.Set(); } internal Message WaitForReply(TimeSpan timeout) { try { if (!TimeoutHelper.WaitOne(this.wait, timeout, false)) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.GetReceiveTimeoutException(timeout)); } } finally { this.CloseWaitHandle(); } this.parent.ThrowIfInvalidReplyIdentity(this.reply); return this.reply; } public void GotReply(Message reply) { lock (this.parent.ThisLock) { this.parent.RequestCompleting(this); } this.reply = reply; this.wait.Set(); this.CloseWaitHandle(); } void CloseWaitHandle() { if (Interlocked.Increment(ref this.waitCount) == 2) { this.wait.Close(); } } } class AsyncDuplexRequest : AsyncResult, IDuplexRequest { static WaitCallback timerCallback = new WaitCallback(AsyncDuplexRequest.TimerCallback); bool aborted; bool enableComplete; bool gotReply; Exception sendException; IAsyncResult sendResult; DuplexChannelBinder parent; Message reply; bool timedOut; TimeSpan timeout; IOThreadTimer timer; ServiceModelActivity activity; internal AsyncDuplexRequest(Message message, DuplexChannelBinder parent, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.parent = parent; this.timeout = timeout; if (timeout != TimeSpan.MaxValue) { this.timer = new IOThreadTimer(AsyncDuplexRequest.timerCallback, this, true); this.timer.Set(timeout); } if (DiagnosticUtility.ShouldUseActivity) { this.activity = TraceUtility.ExtractActivity(message); } } bool IsDone { get { if (!this.enableComplete) { return false; } return (((this.sendResult != null) && this.gotReply) || (this.sendException != null) || this.timedOut || this.aborted); } } public void Abort() { bool done; lock (this.parent.ThisLock) { bool wasDone = this.IsDone; this.aborted = true; done = !wasDone && this.IsDone; } if (done) this.Done(false); } void Done(bool completedSynchronously) { // Make sure that we are acting on the Reply activity. ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(this.reply) : null; using (ServiceModelActivity.BoundOperation(replyActivity)) { if (this.timer != null) { this.timer.Cancel(); this.timer = null; } lock (this.parent.ThisLock) { this.parent.RequestCompleting(this); } if (this.sendException != null) this.Complete(completedSynchronously, this.sendException); else if (this.timedOut) this.Complete(completedSynchronously, this.parent.GetReceiveTimeoutException(this.timeout)); else this.Complete(completedSynchronously); } } public void EnableCompletion() { bool done; lock (this.parent.ThisLock) { bool wasDone = this.IsDone; this.enableComplete = true; done = !wasDone && this.IsDone; } if (done) this.Done(true); } public void FinishedSend(IAsyncResult sendResult, bool completedSynchronously) { Exception sendException = null; try { this.parent.channel.EndSend(sendResult); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; sendException = e; } bool done; lock (this.parent.ThisLock) { bool wasDone = this.IsDone; this.sendResult = sendResult; this.sendException = sendException; done = !wasDone && this.IsDone; } if (done) this.Done(completedSynchronously); } internal Message End() { AsyncResult.End (this); this.parent.ThrowIfInvalidReplyIdentity(this.reply); return this.reply; } public void GotReply(Message reply) { bool done; ServiceModelActivity replyActivity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(reply) : null; using (ServiceModelActivity.BoundOperation(replyActivity)) { lock (this.parent.ThisLock) { bool wasDone = this.IsDone; this.reply = reply; this.gotReply = true; done = !wasDone && this.IsDone; } if (replyActivity != null && DiagnosticUtility.ShouldUseActivity) { TraceUtility.SetActivity(reply, this.activity); if (DiagnosticUtility.ShouldUseActivity && this.activity != null) { DiagnosticUtility.DiagnosticTrace.TraceTransfer(this.activity.Id); } } } if (DiagnosticUtility.ShouldUseActivity && replyActivity != null) { replyActivity.Stop(); } if (done) this.Done(false); } void TimedOut() { bool done; lock (this.parent.ThisLock) { bool wasDone = this.IsDone; this.timedOut = true; done = !wasDone && this.IsDone; } if (done) this.Done(false); } static void TimerCallback(object state) { ((AsyncDuplexRequest)state).TimedOut(); } } class ChannelFaultedAsyncResult : CompletedAsyncResult { public ChannelFaultedAsyncResult(AsyncCallback callback, object state) : base(callback, state) { } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DataGridViewCell.cs
- XmlNodeComparer.cs
- KeyValueConfigurationCollection.cs
- BamlLocalizabilityResolver.cs
- SamlConditions.cs
- WebServiceMethodData.cs
- Queue.cs
- UriScheme.cs
- ExternalFile.cs
- ToolStripDropDownDesigner.cs
- HMACSHA512.cs
- Compiler.cs
- SmiXetterAccessMap.cs
- TimeoutException.cs
- SectionInput.cs
- AccessibleObject.cs
- SelectedDatesCollection.cs
- MetadataSource.cs
- AssociationSetMetadata.cs
- StrongNameHelpers.cs
- DataControlField.cs
- MultipleViewProviderWrapper.cs
- ResXResourceReader.cs
- AttachedAnnotation.cs
- XamlStyleSerializer.cs
- HyperLinkField.cs
- WebPartsPersonalization.cs
- BooleanSwitch.cs
- EpmSourceTree.cs
- FileSecurity.cs
- HashCodeCombiner.cs
- CheckBoxList.cs
- GraphicsPathIterator.cs
- ActivityStatusChangeEventArgs.cs
- CustomCategoryAttribute.cs
- ScrollPattern.cs
- UnitySerializationHolder.cs
- RepeatInfo.cs
- _LazyAsyncResult.cs
- FieldBuilder.cs
- DesignTimeValidationFeature.cs
- FormatterConverter.cs
- TreeWalkHelper.cs
- SoapClientMessage.cs
- TraceInternal.cs
- DataGridViewRowsAddedEventArgs.cs
- ObjectDisposedException.cs
- ResourceFallbackManager.cs
- Rule.cs
- NetworkInterface.cs
- PageVisual.cs
- RecordManager.cs
- ProcessHostConfigUtils.cs
- RewritingPass.cs
- ManagedFilter.cs
- RectangleConverter.cs
- Vector3DCollection.cs
- DataDocumentXPathNavigator.cs
- XLinq.cs
- NamespaceEmitter.cs
- FileLoadException.cs
- InputScopeConverter.cs
- WindowsAuthenticationEventArgs.cs
- TimerElapsedEvenArgs.cs
- _AcceptOverlappedAsyncResult.cs
- FontTypeConverter.cs
- serverconfig.cs
- BinHexDecoder.cs
- HintTextMaxWidthConverter.cs
- KeyToListMap.cs
- XmlIgnoreAttribute.cs
- CheckBoxAutomationPeer.cs
- DynamicRenderer.cs
- securestring.cs
- GroupBoxRenderer.cs
- Trigger.cs
- APCustomTypeDescriptor.cs
- XamlFilter.cs
- SqlDelegatedTransaction.cs
- CryptographicAttribute.cs
- CodeNamespaceImportCollection.cs
- SystemTcpStatistics.cs
- login.cs
- FrameworkPropertyMetadata.cs
- CachedCompositeFamily.cs
- ActivityExecutorDelegateInfo.cs
- XmlSchemaObjectCollection.cs
- InheritanceContextHelper.cs
- CookieParameter.cs
- TextParagraphView.cs
- NativeMethods.cs
- SecUtil.cs
- BrowserTree.cs
- RoleExceptions.cs
- NamespaceExpr.cs
- HtmlTitle.cs
- ContextInformation.cs
- XPathExpr.cs
- DetailsViewRow.cs
- SignatureHelper.cs