Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MessageQueue.cs / 1 / MessageQueue.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System; using System.Diagnostics; using System.Threading; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceModel.Diagnostics; using System.ServiceModel.Activation.Diagnostics; class MessageQueue { static WaitCallback dispatchToNewWorkerCallback = new WaitCallback(DispatchToNewWorkerCallback); static WaitCallback dispatchSessionCallback = new WaitCallback(DispatchSessionCallback); static Dictionaryregistry = new Dictionary (); static List instances = new List (); AsyncCallback dispatchSessionCompletedCallback; List paths; // we use a queue of session-messages for dispatching // we use it to park messages that can't be dispatched and need to be pended // we use a queue of WorkerProcess instances to find free ones that can be dispatched to Queue sessionMessages; Queue sessionWorkers; int maxQueueSize; TransportType transportType; // each MessageQueue has a list of WorkerProcess instances. // each WorkerProcess is associated to a single MessageQueue. // Self-Hosted: 1 WorkerProcess in the list at all times, always the same WorkerProcess (unless we DCR it). 1st WorkerProcess creates the MessageQueue, last WorkerProcess deletes the MessageQueue. // Web-Hosted: 0-n WorkerProcess in the list. MessageQueue created/delete by WAS explicitly or implicitly by WAS going away. List workers; internal MessageQueue() { transportType = TransportType.Unsupported; paths = new List (); workers = new List (); sessionWorkers = new Queue (); sessionMessages = new Queue (); dispatchSessionCompletedCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DispatchSessionCompletedCallback)); lock (instances) { instances.Add(this); } } #if DEBUG internal List SnapshotWorkers() { lock (this.workers) { return new List (workers); } } #endif internal virtual bool CanDispatch { get { return TransportType != TransportType.Tcp || !SMSvcHost.IsTcpPortSharingPaused; } } internal TransportType TransportType { get { return transportType; } } object SessionLock { get { return sessionWorkers; } } internal static void CloseAll(TransportType transportType) { MessageQueue[] instancesCopy; lock (instances) { instancesCopy = instances.ToArray(); instances.Clear(); } foreach (MessageQueue messageQueue in instancesCopy) { if (messageQueue.TransportType == transportType) { messageQueue.CloseCore(); } } } protected int PendingCount { get { lock (SessionLock) { return sessionMessages.Count; } } } protected void Close() { Debug.Print("MessageQueue.Close()"); // this is only called when all the workers are done // with I/O (they could be in the process of closing) lock (instances) { instances.Remove(this); } CloseCore(); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } protected void DropPendingMessages(bool sendFault) { lock (SessionLock) { foreach (ListenerSessionConnection sessionMessage in sessionMessages.ToArray()) { if (sessionMessage != null) { if (sendFault) { TransportListener.SendFault(sessionMessage.Connection, FramingEncodingString.EndpointUnavailableFault); } else { sessionMessage.Connection.Abort(); } } } sessionMessages.Clear(); } } void CloseCore() { Debug.Print("MessageQueue.CloseCore()"); UnregisterAll(); DropPendingMessages(false); lock (registry) { foreach (WorkerProcess worker in workers.ToArray()) { worker.Close(); } workers.Clear(); } if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } internal void EnqueueSessionAndDispatch(ListenerSessionConnection session) { lock (SessionLock) { if (!CanDispatch) { TransportListener.SendFault(session.Connection, FramingEncodingString.EndpointUnavailableFault); OnDispatchFailure(transportType); return; } else if (sessionMessages.Count >= maxQueueSize) { // Abort the connection when the queue is full. session.Connection.Abort(); OnDispatchFailure(transportType); return; } else { sessionMessages.Enqueue(session); } } OnSessionEnqueued(); DispatchSession(); } void EnqueueWorkerAndDispatch(WorkerProcess worker, bool canDispatchOnThisThread) { lock (SessionLock) { sessionWorkers.Enqueue(worker); } if (canDispatchOnThisThread) { DispatchSession(); } else { IOThreadScheduler.ScheduleCallback(dispatchSessionCallback, this); } } static void DispatchSessionCallback(object state) { MessageQueue thisPtr = (MessageQueue)state; thisPtr.DispatchSession(); } void DispatchSession() { for (; ; ) { ListenerSessionConnection session = null; lock (SessionLock) { if (sessionMessages.Count > 0) { WorkerProcess worker = null; while (sessionWorkers.Count > 0) { worker = sessionWorkers.Dequeue(); if (worker.IsRegistered) { break; } worker = null; } if (worker == null) { // There is no more active worker. So break the loop. break; } // For better performance, we may want to check whether the message has been timed out in the future. session = sessionMessages.Dequeue(); session.WorkerProcess = worker; } } if (session == null) { // There is mo more message left. So break the loop. break; } StartDispatchSession(session); } } void StartDispatchSession(ListenerSessionConnection session) { IAsyncResult dispatchAsyncResult = null; try { dispatchAsyncResult = session.WorkerProcess.BeginDispatchSession(session, dispatchSessionCompletedCallback, session); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceWarning) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Warning); } if (session.WorkerProcess.IsRegistered) { // Add the worker back to the queue. EnqueueWorkerAndDispatch(session.WorkerProcess, false); } } if (dispatchAsyncResult != null && dispatchAsyncResult.CompletedSynchronously) { CompleteDispatchSession(dispatchAsyncResult); } } void DispatchSessionCompletedCallback(IAsyncResult result) { if (result.CompletedSynchronously) { return; } CompleteDispatchSession(result); } void CompleteDispatchSession(IAsyncResult result) { ListenerSessionConnection session = (ListenerSessionConnection)result.AsyncState; DiagnosticUtility.DebugAssert(session.WorkerProcess != null, "The WorkerProcess should be set on the message."); if (!session.WorkerProcess.EndDispatchSession(result)) { OnConnectionDispatchFailed(session.Connection); } EnqueueWorkerAndDispatch(session.WorkerProcess, !result.CompletedSynchronously); } protected virtual bool CanShare { get { return false; } } internal static void OnDispatchFailure(TransportType transportType) { if (transportType == TransportType.Tcp) { ListenerPerfCounters.IncrementDispatchFailuresTcp(); } else if (transportType == TransportType.NamedPipe) { ListenerPerfCounters.IncrementDispatchFailuresNamedPipe(); } } bool OnConnectionDispatchFailed(IConnection connection) { TransportListener.SendFault(connection, FramingEncodingString.ConnectionDispatchFailedFault); return false; } protected void OnNewWorkerAvailable(WorkerProcess worker) { lock (this.workers) { worker.Queue = this; workers.Add(worker); // offload draining the IO queues to this new worker on a different thread IOThreadScheduler.ScheduleCallback(dispatchToNewWorkerCallback, worker); } } static void DispatchToNewWorkerCallback(object state) { WorkerProcess worker = state as WorkerProcess; worker.Queue.EnqueueWorkerAndDispatch(worker, true); } public ListenerExceptionStatus Register(BaseUriWithWildcard path) { if (path.BaseAddress.Scheme == Uri.UriSchemeNetTcp) { if (transportType == TransportType.NamedPipe) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetTcp.MaxPendingConnections; transportType = TransportType.Tcp; } else if (path.BaseAddress.Scheme == Uri.UriSchemeNetPipe) { if (transportType == TransportType.Tcp) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetPipe.MaxPendingConnections; transportType = TransportType.NamedPipe; } else { return ListenerExceptionStatus.ProtocolUnsupported; } ListenerExceptionStatus status = RoutingTable.Start(this, path); if (status == ListenerExceptionStatus.Success) { paths.Add(path); IncrementUrisRegisteredCounters(); OnRegisterCompleted(); } return status; } internal static ListenerExceptionStatus Register(BaseUriWithWildcard path, WorkerProcess worker) { MessageQueue queue = null; lock (registry) { if (registry.TryGetValue(path, out queue)) { if (!queue.CanShare) { return ListenerExceptionStatus.ConflictingRegistration; } } else { queue = new MessageQueue(); ListenerExceptionStatus status = ListenerExceptionStatus.FailedToListen; try { status = queue.Register(path); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceError) { ListenerTraceUtility.TraceEvent(TraceEventType.Error, TraceCode.RoutingTableCannotListen, new StringTraceRecord("Path", path.ToString()), null, exception); } } if (status != ListenerExceptionStatus.Success) { // not setting the worker.queue is not a problem, since we can't use this WorkerProcess return status; } registry.Add(path, queue); } } queue.OnNewWorkerAvailable(worker); return ListenerExceptionStatus.Success; } protected virtual void OnSessionEnqueued() {} public void UnregisterAll() { while (paths.Count > 0) { Unregister(paths[0]); } } void Unregister(BaseUriWithWildcard path) { DiagnosticUtility.DebugAssert(paths.Contains(path), "Unregister: unregistering an unregistered path"); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueUnregisterSucceeded, new System.ServiceModel.Diagnostics.StringTraceRecord("Path", path.ToString()), this, null); } RoutingTable.Stop(this, path); IncrementUrisUnregisteredCounters(); OnUnregisterCompleted(); registry.Remove(path); paths.Remove(path); } protected virtual void OnUnregisterLastWorker() { Debug.Print("MessageQueue.OnUnregisterLastWorker() calling Close()"); Close(); } internal virtual void Unregister(WorkerProcess worker) { Debug.Print("MessageQueue.Unregister() worker: " + worker.ProcessId); lock (registry) { DiagnosticUtility.DebugAssert(object.Equals(this, worker.Queue), "MessageQueue.Unregister() cannot unregister a worker registered with a queue different than this."); workers.Remove(worker); Debug.Print("MessageQueue.Unregister() left with workers: " + workers.Count); if (workers.Count == 0) { OnUnregisterLastWorker(); } } } protected virtual void OnRegisterCompleted() { IncrementRegistrationsActiveCounters(); } protected virtual void OnUnregisterCompleted() { DecrementRegistrationsActiveCounters(); } protected void IncrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.IncrementRegistrationsActiveNamedPipe(); } } protected void DecrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.DecrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.DecrementRegistrationsActiveNamedPipe(); } } void IncrementUrisUnregisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisUnregisteredTcp(); } else { ListenerPerfCounters.IncrementUrisUnregisteredNamedPipe(); } } void IncrementUrisRegisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisRegisteredTcp(); } else { ListenerPerfCounters.IncrementUrisRegisteredNamedPipe(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- XmlSchemaImporter.cs
- DecoderNLS.cs
- ToolStripDropDownItem.cs
- InternalEnumValidatorAttribute.cs
- KeyBinding.cs
- HwndSourceKeyboardInputSite.cs
- AttachInfo.cs
- FontCacheLogic.cs
- StylusOverProperty.cs
- ModuleBuilderData.cs
- XmlDataDocument.cs
- XmlDigitalSignatureProcessor.cs
- NameObjectCollectionBase.cs
- SerialReceived.cs
- Win32MouseDevice.cs
- followingsibling.cs
- TemplateBaseAction.cs
- StoreContentChangedEventArgs.cs
- DesignBinding.cs
- NetDispatcherFaultException.cs
- TableColumn.cs
- IdentityValidationException.cs
- FieldTemplateFactory.cs
- HandlerBase.cs
- ValidatedMobileControlConverter.cs
- SafeViewOfFileHandle.cs
- EvidenceTypeDescriptor.cs
- DropTarget.cs
- DecimalAnimationUsingKeyFrames.cs
- XpsFilter.cs
- Subtree.cs
- DataBindEngine.cs
- DynamicPropertyReader.cs
- SafeTimerHandle.cs
- FunctionGenerator.cs
- LogicalChannel.cs
- DrawingCollection.cs
- SettingsSavedEventArgs.cs
- GB18030Encoding.cs
- SHA384Managed.cs
- FrameworkElement.cs
- XmlTypeAttribute.cs
- AsyncPostBackTrigger.cs
- AbstractDataSvcMapFileLoader.cs
- KeyNotFoundException.cs
- SafeLibraryHandle.cs
- PageBuildProvider.cs
- PerformanceCounter.cs
- CodeTypeOfExpression.cs
- Base64Encoder.cs
- FeatureSupport.cs
- QueryExpr.cs
- TreeViewEvent.cs
- ImageFormatConverter.cs
- Peer.cs
- TrustManagerPromptUI.cs
- TypeConverterHelper.cs
- __Filters.cs
- RootBrowserWindowProxy.cs
- AssociationType.cs
- ServiceRouteHandler.cs
- _OSSOCK.cs
- XmlException.cs
- StreamGeometry.cs
- WsatConfiguration.cs
- MsmqHostedTransportManager.cs
- PermissionSetTriple.cs
- CodeCompileUnit.cs
- StreamSecurityUpgradeAcceptorAsyncResult.cs
- RuleInfoComparer.cs
- TypedDatasetGenerator.cs
- Point4D.cs
- TabOrder.cs
- Config.cs
- UniqueIdentifierService.cs
- CodeGotoStatement.cs
- DiffuseMaterial.cs
- Metafile.cs
- StylusCollection.cs
- SmtpFailedRecipientException.cs
- SecurityResources.cs
- DataSourceControl.cs
- SourceFileInfo.cs
- ConsoleKeyInfo.cs
- DiscoveryViaBehavior.cs
- EdmType.cs
- SiteMapSection.cs
- EntityClientCacheKey.cs
- HostedElements.cs
- UserControlCodeDomTreeGenerator.cs
- RectConverter.cs
- TrackingStringDictionary.cs
- CommandLineParser.cs
- ContextInformation.cs
- SimpleBitVector32.cs
- EntitySqlQueryCacheEntry.cs
- Semaphore.cs
- MatchingStyle.cs
- XmlCharType.cs
- CompoundFileStreamReference.cs