Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MessageQueue.cs / 1 / MessageQueue.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System; using System.Diagnostics; using System.Threading; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceModel.Diagnostics; using System.ServiceModel.Activation.Diagnostics; class MessageQueue { static WaitCallback dispatchToNewWorkerCallback = new WaitCallback(DispatchToNewWorkerCallback); static WaitCallback dispatchSessionCallback = new WaitCallback(DispatchSessionCallback); static Dictionaryregistry = new Dictionary (); static List instances = new List (); AsyncCallback dispatchSessionCompletedCallback; List paths; // we use a queue of session-messages for dispatching // we use it to park messages that can't be dispatched and need to be pended // we use a queue of WorkerProcess instances to find free ones that can be dispatched to Queue sessionMessages; Queue sessionWorkers; int maxQueueSize; TransportType transportType; // each MessageQueue has a list of WorkerProcess instances. // each WorkerProcess is associated to a single MessageQueue. // Self-Hosted: 1 WorkerProcess in the list at all times, always the same WorkerProcess (unless we DCR it). 1st WorkerProcess creates the MessageQueue, last WorkerProcess deletes the MessageQueue. // Web-Hosted: 0-n WorkerProcess in the list. MessageQueue created/delete by WAS explicitly or implicitly by WAS going away. List workers; internal MessageQueue() { transportType = TransportType.Unsupported; paths = new List (); workers = new List (); sessionWorkers = new Queue (); sessionMessages = new Queue (); dispatchSessionCompletedCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DispatchSessionCompletedCallback)); lock (instances) { instances.Add(this); } } #if DEBUG internal List SnapshotWorkers() { lock (this.workers) { return new List (workers); } } #endif internal virtual bool CanDispatch { get { return TransportType != TransportType.Tcp || !SMSvcHost.IsTcpPortSharingPaused; } } internal TransportType TransportType { get { return transportType; } } object SessionLock { get { return sessionWorkers; } } internal static void CloseAll(TransportType transportType) { MessageQueue[] instancesCopy; lock (instances) { instancesCopy = instances.ToArray(); instances.Clear(); } foreach (MessageQueue messageQueue in instancesCopy) { if (messageQueue.TransportType == transportType) { messageQueue.CloseCore(); } } } protected int PendingCount { get { lock (SessionLock) { return sessionMessages.Count; } } } protected void Close() { Debug.Print("MessageQueue.Close()"); // this is only called when all the workers are done // with I/O (they could be in the process of closing) lock (instances) { instances.Remove(this); } CloseCore(); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } protected void DropPendingMessages(bool sendFault) { lock (SessionLock) { foreach (ListenerSessionConnection sessionMessage in sessionMessages.ToArray()) { if (sessionMessage != null) { if (sendFault) { TransportListener.SendFault(sessionMessage.Connection, FramingEncodingString.EndpointUnavailableFault); } else { sessionMessage.Connection.Abort(); } } } sessionMessages.Clear(); } } void CloseCore() { Debug.Print("MessageQueue.CloseCore()"); UnregisterAll(); DropPendingMessages(false); lock (registry) { foreach (WorkerProcess worker in workers.ToArray()) { worker.Close(); } workers.Clear(); } if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } internal void EnqueueSessionAndDispatch(ListenerSessionConnection session) { lock (SessionLock) { if (!CanDispatch) { TransportListener.SendFault(session.Connection, FramingEncodingString.EndpointUnavailableFault); OnDispatchFailure(transportType); return; } else if (sessionMessages.Count >= maxQueueSize) { // Abort the connection when the queue is full. session.Connection.Abort(); OnDispatchFailure(transportType); return; } else { sessionMessages.Enqueue(session); } } OnSessionEnqueued(); DispatchSession(); } void EnqueueWorkerAndDispatch(WorkerProcess worker, bool canDispatchOnThisThread) { lock (SessionLock) { sessionWorkers.Enqueue(worker); } if (canDispatchOnThisThread) { DispatchSession(); } else { IOThreadScheduler.ScheduleCallback(dispatchSessionCallback, this); } } static void DispatchSessionCallback(object state) { MessageQueue thisPtr = (MessageQueue)state; thisPtr.DispatchSession(); } void DispatchSession() { for (; ; ) { ListenerSessionConnection session = null; lock (SessionLock) { if (sessionMessages.Count > 0) { WorkerProcess worker = null; while (sessionWorkers.Count > 0) { worker = sessionWorkers.Dequeue(); if (worker.IsRegistered) { break; } worker = null; } if (worker == null) { // There is no more active worker. So break the loop. break; } // For better performance, we may want to check whether the message has been timed out in the future. session = sessionMessages.Dequeue(); session.WorkerProcess = worker; } } if (session == null) { // There is mo more message left. So break the loop. break; } StartDispatchSession(session); } } void StartDispatchSession(ListenerSessionConnection session) { IAsyncResult dispatchAsyncResult = null; try { dispatchAsyncResult = session.WorkerProcess.BeginDispatchSession(session, dispatchSessionCompletedCallback, session); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceWarning) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Warning); } if (session.WorkerProcess.IsRegistered) { // Add the worker back to the queue. EnqueueWorkerAndDispatch(session.WorkerProcess, false); } } if (dispatchAsyncResult != null && dispatchAsyncResult.CompletedSynchronously) { CompleteDispatchSession(dispatchAsyncResult); } } void DispatchSessionCompletedCallback(IAsyncResult result) { if (result.CompletedSynchronously) { return; } CompleteDispatchSession(result); } void CompleteDispatchSession(IAsyncResult result) { ListenerSessionConnection session = (ListenerSessionConnection)result.AsyncState; DiagnosticUtility.DebugAssert(session.WorkerProcess != null, "The WorkerProcess should be set on the message."); if (!session.WorkerProcess.EndDispatchSession(result)) { OnConnectionDispatchFailed(session.Connection); } EnqueueWorkerAndDispatch(session.WorkerProcess, !result.CompletedSynchronously); } protected virtual bool CanShare { get { return false; } } internal static void OnDispatchFailure(TransportType transportType) { if (transportType == TransportType.Tcp) { ListenerPerfCounters.IncrementDispatchFailuresTcp(); } else if (transportType == TransportType.NamedPipe) { ListenerPerfCounters.IncrementDispatchFailuresNamedPipe(); } } bool OnConnectionDispatchFailed(IConnection connection) { TransportListener.SendFault(connection, FramingEncodingString.ConnectionDispatchFailedFault); return false; } protected void OnNewWorkerAvailable(WorkerProcess worker) { lock (this.workers) { worker.Queue = this; workers.Add(worker); // offload draining the IO queues to this new worker on a different thread IOThreadScheduler.ScheduleCallback(dispatchToNewWorkerCallback, worker); } } static void DispatchToNewWorkerCallback(object state) { WorkerProcess worker = state as WorkerProcess; worker.Queue.EnqueueWorkerAndDispatch(worker, true); } public ListenerExceptionStatus Register(BaseUriWithWildcard path) { if (path.BaseAddress.Scheme == Uri.UriSchemeNetTcp) { if (transportType == TransportType.NamedPipe) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetTcp.MaxPendingConnections; transportType = TransportType.Tcp; } else if (path.BaseAddress.Scheme == Uri.UriSchemeNetPipe) { if (transportType == TransportType.Tcp) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetPipe.MaxPendingConnections; transportType = TransportType.NamedPipe; } else { return ListenerExceptionStatus.ProtocolUnsupported; } ListenerExceptionStatus status = RoutingTable.Start(this, path); if (status == ListenerExceptionStatus.Success) { paths.Add(path); IncrementUrisRegisteredCounters(); OnRegisterCompleted(); } return status; } internal static ListenerExceptionStatus Register(BaseUriWithWildcard path, WorkerProcess worker) { MessageQueue queue = null; lock (registry) { if (registry.TryGetValue(path, out queue)) { if (!queue.CanShare) { return ListenerExceptionStatus.ConflictingRegistration; } } else { queue = new MessageQueue(); ListenerExceptionStatus status = ListenerExceptionStatus.FailedToListen; try { status = queue.Register(path); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceError) { ListenerTraceUtility.TraceEvent(TraceEventType.Error, TraceCode.RoutingTableCannotListen, new StringTraceRecord("Path", path.ToString()), null, exception); } } if (status != ListenerExceptionStatus.Success) { // not setting the worker.queue is not a problem, since we can't use this WorkerProcess return status; } registry.Add(path, queue); } } queue.OnNewWorkerAvailable(worker); return ListenerExceptionStatus.Success; } protected virtual void OnSessionEnqueued() {} public void UnregisterAll() { while (paths.Count > 0) { Unregister(paths[0]); } } void Unregister(BaseUriWithWildcard path) { DiagnosticUtility.DebugAssert(paths.Contains(path), "Unregister: unregistering an unregistered path"); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueUnregisterSucceeded, new System.ServiceModel.Diagnostics.StringTraceRecord("Path", path.ToString()), this, null); } RoutingTable.Stop(this, path); IncrementUrisUnregisteredCounters(); OnUnregisterCompleted(); registry.Remove(path); paths.Remove(path); } protected virtual void OnUnregisterLastWorker() { Debug.Print("MessageQueue.OnUnregisterLastWorker() calling Close()"); Close(); } internal virtual void Unregister(WorkerProcess worker) { Debug.Print("MessageQueue.Unregister() worker: " + worker.ProcessId); lock (registry) { DiagnosticUtility.DebugAssert(object.Equals(this, worker.Queue), "MessageQueue.Unregister() cannot unregister a worker registered with a queue different than this."); workers.Remove(worker); Debug.Print("MessageQueue.Unregister() left with workers: " + workers.Count); if (workers.Count == 0) { OnUnregisterLastWorker(); } } } protected virtual void OnRegisterCompleted() { IncrementRegistrationsActiveCounters(); } protected virtual void OnUnregisterCompleted() { DecrementRegistrationsActiveCounters(); } protected void IncrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.IncrementRegistrationsActiveNamedPipe(); } } protected void DecrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.DecrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.DecrementRegistrationsActiveNamedPipe(); } } void IncrementUrisUnregisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisUnregisteredTcp(); } else { ListenerPerfCounters.IncrementUrisUnregisteredNamedPipe(); } } void IncrementUrisRegisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisRegisteredTcp(); } else { ListenerPerfCounters.IncrementUrisRegisteredNamedPipe(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- NetSectionGroup.cs
- HtmlInputSubmit.cs
- EdgeModeValidation.cs
- CaretElement.cs
- PasswordRecovery.cs
- InheritanceContextChangedEventManager.cs
- SafeCryptHandles.cs
- DelegatingConfigHost.cs
- X500Name.cs
- MonthCalendar.cs
- CryptographicAttribute.cs
- coordinatorscratchpad.cs
- PersistChildrenAttribute.cs
- MouseButtonEventArgs.cs
- QueryTaskGroupState.cs
- TypeResolver.cs
- EpmCustomContentDeSerializer.cs
- TabItemAutomationPeer.cs
- PrintControllerWithStatusDialog.cs
- OleDbErrorCollection.cs
- UnsafeNativeMethodsTablet.cs
- PageAdapter.cs
- NullableLongAverageAggregationOperator.cs
- MetabaseServerConfig.cs
- BrowserCapabilitiesCodeGenerator.cs
- SafeUserTokenHandle.cs
- RestClientProxyHandler.cs
- UnsafeNativeMethodsMilCoreApi.cs
- LockCookie.cs
- DtdParser.cs
- InvalidDataException.cs
- EventEntry.cs
- PropertyToken.cs
- PriorityBinding.cs
- TypedElement.cs
- XmlSchemaFacet.cs
- SelectingProviderEventArgs.cs
- Utility.cs
- EncryptedXml.cs
- ScaleTransform.cs
- StringFormat.cs
- MarginCollapsingState.cs
- Walker.cs
- ValidationEventArgs.cs
- ToolTip.cs
- HScrollBar.cs
- dbdatarecord.cs
- MenuItem.cs
- DependencyObjectValidator.cs
- AttachedAnnotation.cs
- FormClosedEvent.cs
- RegexWorker.cs
- EntityDataSourceWrapperPropertyDescriptor.cs
- StylusCollection.cs
- NamespaceTable.cs
- TableDetailsRow.cs
- CodeIterationStatement.cs
- RegistryPermission.cs
- ParameterCollection.cs
- dataprotectionpermission.cs
- WebPartUserCapability.cs
- ExpandCollapseProviderWrapper.cs
- HMACSHA256.cs
- CDSsyncETWBCLProvider.cs
- ArrayElementGridEntry.cs
- ParameterExpression.cs
- RegistrySecurity.cs
- ZipIOExtraField.cs
- NamespaceDisplay.xaml.cs
- WMIGenerator.cs
- GatewayDefinition.cs
- ReferenceService.cs
- Timer.cs
- CorrelationManager.cs
- EntityContainerEmitter.cs
- ContentElementAutomationPeer.cs
- IdentityModelStringsVersion1.cs
- EntityDataSourceViewSchema.cs
- SoapAttributes.cs
- DataStreamFromComStream.cs
- FontStretchConverter.cs
- HtmlInputText.cs
- CryptographicAttribute.cs
- XmlBaseWriter.cs
- NameTable.cs
- DragEventArgs.cs
- ImportContext.cs
- DataServiceKeyAttribute.cs
- Calendar.cs
- SoapEnumAttribute.cs
- MetadataWorkspace.cs
- DynamicScriptObject.cs
- XmlDataDocument.cs
- AsyncPostBackTrigger.cs
- ServicePoint.cs
- ToolStripCodeDomSerializer.cs
- SiteMembershipCondition.cs
- CompilerLocalReference.cs
- PolyQuadraticBezierSegmentFigureLogic.cs
- DispatcherHooks.cs