Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MsmqActivation.cs / 1 / MsmqActivation.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System.ServiceModel.Activation.Diagnostics; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceProcess; using System.Threading; using System.Messaging; using System.ServiceModel.Diagnostics; using MQMessageQueue = System.Messaging.MessageQueue; using MQMessage = System.Messaging.Message; using MQException = System.Messaging.MessageQueueException; class MsmqActivation : ServiceBase { BindingsManager bindings; ActivationService integrationActivationService; ListenerAdapter integrationListenerAdapter; ActivationService transportActivationService; ListenerAdapter transportListenerAdapter; public MsmqActivation() { ServiceName = ListenerConstants.MsmqActivationServiceName; CanHandlePowerEvent = false; AutoLog = false; CanStop = true; CanPauseAndContinue = true; CanShutdown = true; this.bindings = new BindingsManager(); this.integrationActivationService = new ActivationService(this, MsmqUri.FormatNameAddressTranslator.Scheme); this.transportActivationService = new ActivationService(this, MsmqUri.NetMsmqAddressTranslator.Scheme); } protected override void OnStart(string[] args) { try { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStart, this); } #if DEBUG if (DebuggableService.DelayStart(ServiceName)) { (new Thread(new ThreadStart(Start))).Start(); return; } #endif Start(); } catch (Exception exception) { // Log the error to eventlog. ListenerTraceUtility.EventLog.LogEvent(TraceEventType.Error, EventLogCategory.ListenerAdapter, EventLogEventId.ServiceStartFailed, false, exception.ToString()); throw; } } protected override void OnStop() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStop, this); } Shutdown(); } protected override void OnContinue() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceContinue, this); } this.integrationActivationService.Paused = false; this.transportActivationService.Paused = false; } protected override void OnPause() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServicePause, this); } this.integrationActivationService.Paused = true; this.transportActivationService.Paused = true; } protected override void OnShutdown() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceShutdown, this); } Shutdown(); Stop(); } void Start() { #if DEBUG DebuggableService.WaitForDebugger(ServiceName); #endif if (!SMSvcHost.IsWebhostSupported) { const int ERROR_NOT_SUPPORTED = 50; this.ExitCode = ERROR_NOT_SUPPORTED; throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ServiceRequiresWas))); } this.integrationListenerAdapter = new ListenerAdapter(this.integrationActivationService); this.transportListenerAdapter = new ListenerAdapter(this.transportActivationService); this.integrationListenerAdapter.Open(); this.transportListenerAdapter.Open(); } void Shutdown() { this.integrationListenerAdapter.Close(); this.transportListenerAdapter.Close(); } class BindingsManager { DictionarybindingMonitors; object thisLock = new object(); public BindingsManager() { this.bindingMonitors = new Dictionary (StringComparer.OrdinalIgnoreCase); } public void RegisterBindingFilterIfNecessary(string host, MsmqBindingFilter filter) { lock (this.thisLock) { MsmqBindingMonitor bindingMonitor; if (!this.bindingMonitors.TryGetValue(host, out bindingMonitor)) { bindingMonitor = new MsmqBindingMonitor(host); this.bindingMonitors.Add(host, bindingMonitor); bindingMonitor.Open(); } // register the new filter if it doesn't already exist: if (!bindingMonitor.ContainsFilter(filter)) { bindingMonitor.AddFilter(filter); } } } public void UnregisterBindingFilter(MsmqBindingFilter filter) { lock (this.thisLock) { foreach (MsmqBindingMonitor monitor in this.bindingMonitors.Values) { monitor.RemoveFilter(filter); } } } } class ActivationService : IActivationService { Dictionary groups; string protocol; BindingsManager bindings; object thisLock = new object(); ServiceBase service; bool paused; public ActivationService(MsmqActivation service, string protocol) { this.protocol = protocol; this.bindings = service.bindings; this.service = service; this.paused = false; this.groups = new Dictionary (); } public bool Paused { get { return this.paused; } set { lock(this) { if(this.paused != value) { this.paused = value; if(!this.paused) { foreach(QueueMonitorGroup group in this.groups.Values) { group.Start(); } } } } } } public BindingsManager Bindings { get { return this.bindings; } } public string ActivationServiceName { get { return this.service.ServiceName; } } public string ProtocolName { get { return this.protocol; } } public IActivatedMessageQueue CreateQueue(ListenerAdapter la, App app) { QueueMonitorGroup qmg = new QueueMonitorGroup(this, la, app); lock (this.thisLock) { this.groups[qmg.ListenerChannelContext.ListenerChannelId] = qmg; } return qmg; } public IActivatedMessageQueue FindQueue(int queueId) { lock (this.thisLock) { QueueMonitorGroup group; this.groups.TryGetValue(queueId, out group); return group; } } public void StopService() { this.service.Stop(); } public void QueueMonitorGroupClosed(QueueMonitorGroup qmg) { lock (this.thisLock) { this.groups.Remove(qmg.ListenerChannelContext.ListenerChannelId); } } } class QueueMonitorGroup : IActivatedMessageQueue { static int queueIdCounter = 0; static readonly TimeSpan RetryMonitorInterval = TimeSpan.FromMinutes(5); ActivationService activationService; App app; ActivationBindingFilter filter; ListenerAdapter listenerAdapter; int startQueueInstanceCount; ListenerChannelContext listenerChannelContext; List monitors = new List (); List failedMonitors = new List (); bool enabled; int pendingNotificationCount; IOThreadTimer retryTimer; bool retryScheduled = false; bool hasStartedQueueInstances; public QueueMonitorGroup(ActivationService activationService, ListenerAdapter la, App app) { this.activationService = activationService; this.listenerAdapter = la; this.app = app; this.startQueueInstanceCount = 1; this.listenerChannelContext = new ListenerChannelContext(app.AppKey, Interlocked.Increment(ref queueIdCounter), Guid.Empty); this.pendingNotificationCount = 0; this.filter = new ActivationBindingFilter(this, app.Path); this.retryTimer = new IOThreadTimer(OnRetryTimer, null, false); } public bool CanDispatch { get { return this.enabled && !this.activationService.Paused; } } public App App { get { return this.app; } } public ListenerChannelContext ListenerChannelContext { get { return this.listenerChannelContext; } } bool IActivatedMessageQueue.HasStartedQueueInstances { get { return this.hasStartedQueueInstances; } } void IActivatedMessageQueue.OnQueueInstancesStopped() { this.hasStartedQueueInstances = false; } public void Delete() { this.activationService.QueueMonitorGroupClosed(this); UnregisterAll(); } public void LaunchQueueInstance() { bool startInstance = false; lock (this) { if (this.pendingNotificationCount > 0) { this.pendingNotificationCount--; startInstance = true; } else { // start monitoring for new messages... startQueueInstanceCount++; // Make sure that everyone is peeking: foreach (QueueMonitor monitor in this.monitors) { monitor.Start(); } } } if (startInstance) { if (this.listenerAdapter.OpenListenerChannelInstance(this)) { this.hasStartedQueueInstances = true; } } } public ListenerExceptionStatus Register(BaseUriWithWildcard url) { this.activationService.Bindings.RegisterBindingFilterIfNecessary(url.BaseAddress.Host, this.filter); return ListenerExceptionStatus.Success; } public void Start() { lock(this) { if(this.CanDispatch) { // Ensure that we're started... foreach(QueueMonitor monitor in this.monitors) { monitor.Start(); } } } } public void SetEnabledState(bool enabled) { lock(this) { if (this.enabled != enabled) { this.enabled = enabled; Start(); } } } public void UnregisterAll() { lock (this) { foreach (QueueMonitor monitor in this.monitors) { monitor.Dispose(); } this.monitors.Clear(); this.activationService.Bindings.UnregisterBindingFilter(this.filter); } } public bool NotifyMessageAvailable() { bool startInstance = false; bool shouldContinue = false; lock (this) { if (!this.CanDispatch) { this.pendingNotificationCount++; } else if (this.startQueueInstanceCount == 0) { this.pendingNotificationCount++; } else { this.startQueueInstanceCount--; startInstance = true; shouldContinue = this.startQueueInstanceCount > 0; } } if (startInstance) { MsmqDiagnostics.StartingApplication(this.app.Path); this.listenerAdapter.OpenListenerChannelInstance(this); this.hasStartedQueueInstances = true; } return shouldContinue; } public void ScheduleRetry(QueueMonitor monitor) { lock (this) { this.failedMonitors.Add(monitor); if (!this.retryScheduled) { this.retryTimer.Set(RetryMonitorInterval); this.retryScheduled = true; } } } object AddQueueToGroup(Uri queue) { QueueMonitor monitor = null; lock (this) { monitor = new QueueMonitor(queue, this); this.monitors.Add(monitor); if (this.enabled) { monitor.Start(); } } return monitor; } void OnRetryTimer(object state) { lock (this) { if (this.enabled) { foreach (QueueMonitor monitor in this.failedMonitors) { // Only start it if we still own it... if (this.monitors.Contains(monitor)) { monitor.Start(); } } } this.failedMonitors.Clear(); } } void RemoveQueueFromGroup(object state) { QueueMonitor monitor = (QueueMonitor)state; lock (this) { this.monitors.Remove(monitor); monitor.Dispose(); } } // Note that we inherit from the transport binding filter here - that's not // a big deal, because we never need these uris to create services. class ActivationBindingFilter : MsmqBindingFilter { QueueMonitorGroup group; public ActivationBindingFilter(QueueMonitorGroup group, string path) : base(path, MsmqUri.NetMsmqAddressTranslator) { this.group = group; } public override object MatchFound(string host, string name, bool isPrivate) { MsmqDiagnostics.MatchedApplicationFound(host, name, isPrivate, this.CanonicalPrefix); return this.group.AddQueueToGroup(CreateServiceUri(host, name, isPrivate)); } public override void MatchLost(string host, string name, bool isPrivate, object callbackState) { this.group.RemoveQueueFromGroup(callbackState); } } } class QueueMonitor : IDisposable { static readonly TimeSpan InfiniteTimeout = TimeSpan.FromMilliseconds(UInt32.MaxValue); bool disposed; QueueMonitorGroup group; bool peeking; string queueName; MQMessageQueue queue; public QueueMonitor(Uri uri, QueueMonitorGroup group) { // The defaults don't really matter here - we don't use // the buffer manager. this.group = group; this.queueName = MsmqFormatName.ToSystemMessagingQueueName(MsmqUri.UriToFormatNameByScheme(uri)); this.peeking = false; Debug.Print("opening queue: " + this.queueName); } public void Start() { lock (this) { try { if (this.queue == null) { this.queue = new MQMessageQueue(this.queueName); this.queue.MessageReadPropertyFilter.ClearAll(); this.queue.MessageReadPropertyFilter.LookupId = true; } if (!this.peeking) { this.peeking = true; this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } } catch (MQException) { this.group.ScheduleRetry(this); } } } public void Dispose() { lock (this) { this.disposed = true; if(this.queue != null) { this.queue.Dispose(); } } } void OnPeekCompleted(IAsyncResult result) { bool shouldContinue = true; try { MQMessage message = this.queue.EndPeek(result); Debug.Print("MsmqActivation.QueueMonitor.OnPeekCompleted: message available"); shouldContinue = this.group.NotifyMessageAvailable(); } catch (MQException ex) { MsmqDiagnostics.CannotPeekOnQueue(this.queue.FormatName, ex); this.group.ScheduleRetry(this); return; } catch (Exception ex) { if (DiagnosticUtility.ShouldTraceError) { DiagnosticUtility.ExceptionUtility.TraceHandledException(ex, TraceEventType.Error); } if (!DiagnosticUtility.IsFatal(ex)) { this.group.ScheduleRetry(this); } throw; } lock (this) { if (!this.disposed && shouldContinue) { this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } else { this.peeking = false; } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DuplicateWaitObjectException.cs
- SmiEventSink_DeferedProcessing.cs
- EncryptedType.cs
- MobileErrorInfo.cs
- UIElementParaClient.cs
- InstanceKey.cs
- ContentHostHelper.cs
- SaveWorkflowAsyncResult.cs
- RequestQueryProcessor.cs
- ProgressBarBrushConverter.cs
- RemoteHelper.cs
- ObjectSet.cs
- HyperLinkDesigner.cs
- DisplayInformation.cs
- Context.cs
- WhiteSpaceTrimStringConverter.cs
- UnionCqlBlock.cs
- ToolTip.cs
- TextEffectCollection.cs
- PropertySourceInfo.cs
- _PooledStream.cs
- MethodRental.cs
- RoleServiceManager.cs
- NewArray.cs
- ErrorFormatterPage.cs
- TypographyProperties.cs
- MediaContextNotificationWindow.cs
- HexParser.cs
- Italic.cs
- MgmtConfigurationRecord.cs
- SqlDeflator.cs
- WrappingXamlSchemaContext.cs
- VarRemapper.cs
- RequestResponse.cs
- AVElementHelper.cs
- DbXmlEnabledProviderManifest.cs
- ApplicationContext.cs
- BrowserDefinition.cs
- CodeNamespace.cs
- CodeAccessPermission.cs
- WebEventTraceProvider.cs
- EventlogProvider.cs
- XPathChildIterator.cs
- ScriptIgnoreAttribute.cs
- SamlAudienceRestrictionCondition.cs
- CustomAttributeSerializer.cs
- BaseCodePageEncoding.cs
- LinearQuaternionKeyFrame.cs
- ItemChangedEventArgs.cs
- SecurityHelper.cs
- XmlBinaryWriterSession.cs
- TransformerInfo.cs
- ReferencedCategoriesDocument.cs
- WebPartConnectionsCancelVerb.cs
- DataSourceControlBuilder.cs
- SoapHeaders.cs
- ToolStripControlHost.cs
- BufferModesCollection.cs
- RuntimeCompatibilityAttribute.cs
- SystemInfo.cs
- HostingEnvironmentSection.cs
- NetworkInterface.cs
- FormViewModeEventArgs.cs
- BooleanExpr.cs
- XhtmlBasicTextViewAdapter.cs
- MenuItem.cs
- ConfigurationErrorsException.cs
- ProcessModule.cs
- EntityCollection.cs
- PersonalizationStateQuery.cs
- NamespaceEmitter.cs
- SparseMemoryStream.cs
- CompositeCollectionView.cs
- CorrelationManager.cs
- TableRow.cs
- Point.cs
- DataControlImageButton.cs
- XmlReflectionImporter.cs
- WebHttpSecurityModeHelper.cs
- SystemColors.cs
- NetworkStream.cs
- Annotation.cs
- ToolBar.cs
- InternalControlCollection.cs
- Zone.cs
- DesignerDataSchemaClass.cs
- ArgumentDesigner.xaml.cs
- SolidColorBrush.cs
- TextEditorThreadLocalStore.cs
- ViewStateModeByIdAttribute.cs
- CodeIdentifiers.cs
- TypeToken.cs
- WebControlAdapter.cs
- CodeThrowExceptionStatement.cs
- Keyboard.cs
- IIS7WorkerRequest.cs
- DropDownList.cs
- StorageEndPropertyMapping.cs
- ELinqQueryState.cs
- DataConnectionHelper.cs