Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MsmqActivation.cs / 1 / MsmqActivation.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System.ServiceModel.Activation.Diagnostics; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceProcess; using System.Threading; using System.Messaging; using System.ServiceModel.Diagnostics; using MQMessageQueue = System.Messaging.MessageQueue; using MQMessage = System.Messaging.Message; using MQException = System.Messaging.MessageQueueException; class MsmqActivation : ServiceBase { BindingsManager bindings; ActivationService integrationActivationService; ListenerAdapter integrationListenerAdapter; ActivationService transportActivationService; ListenerAdapter transportListenerAdapter; public MsmqActivation() { ServiceName = ListenerConstants.MsmqActivationServiceName; CanHandlePowerEvent = false; AutoLog = false; CanStop = true; CanPauseAndContinue = true; CanShutdown = true; this.bindings = new BindingsManager(); this.integrationActivationService = new ActivationService(this, MsmqUri.FormatNameAddressTranslator.Scheme); this.transportActivationService = new ActivationService(this, MsmqUri.NetMsmqAddressTranslator.Scheme); } protected override void OnStart(string[] args) { try { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStart, this); } #if DEBUG if (DebuggableService.DelayStart(ServiceName)) { (new Thread(new ThreadStart(Start))).Start(); return; } #endif Start(); } catch (Exception exception) { // Log the error to eventlog. ListenerTraceUtility.EventLog.LogEvent(TraceEventType.Error, EventLogCategory.ListenerAdapter, EventLogEventId.ServiceStartFailed, false, exception.ToString()); throw; } } protected override void OnStop() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStop, this); } Shutdown(); } protected override void OnContinue() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceContinue, this); } this.integrationActivationService.Paused = false; this.transportActivationService.Paused = false; } protected override void OnPause() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServicePause, this); } this.integrationActivationService.Paused = true; this.transportActivationService.Paused = true; } protected override void OnShutdown() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceShutdown, this); } Shutdown(); Stop(); } void Start() { #if DEBUG DebuggableService.WaitForDebugger(ServiceName); #endif if (!SMSvcHost.IsWebhostSupported) { const int ERROR_NOT_SUPPORTED = 50; this.ExitCode = ERROR_NOT_SUPPORTED; throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ServiceRequiresWas))); } this.integrationListenerAdapter = new ListenerAdapter(this.integrationActivationService); this.transportListenerAdapter = new ListenerAdapter(this.transportActivationService); this.integrationListenerAdapter.Open(); this.transportListenerAdapter.Open(); } void Shutdown() { this.integrationListenerAdapter.Close(); this.transportListenerAdapter.Close(); } class BindingsManager { DictionarybindingMonitors; object thisLock = new object(); public BindingsManager() { this.bindingMonitors = new Dictionary (StringComparer.OrdinalIgnoreCase); } public void RegisterBindingFilterIfNecessary(string host, MsmqBindingFilter filter) { lock (this.thisLock) { MsmqBindingMonitor bindingMonitor; if (!this.bindingMonitors.TryGetValue(host, out bindingMonitor)) { bindingMonitor = new MsmqBindingMonitor(host); this.bindingMonitors.Add(host, bindingMonitor); bindingMonitor.Open(); } // register the new filter if it doesn't already exist: if (!bindingMonitor.ContainsFilter(filter)) { bindingMonitor.AddFilter(filter); } } } public void UnregisterBindingFilter(MsmqBindingFilter filter) { lock (this.thisLock) { foreach (MsmqBindingMonitor monitor in this.bindingMonitors.Values) { monitor.RemoveFilter(filter); } } } } class ActivationService : IActivationService { Dictionary groups; string protocol; BindingsManager bindings; object thisLock = new object(); ServiceBase service; bool paused; public ActivationService(MsmqActivation service, string protocol) { this.protocol = protocol; this.bindings = service.bindings; this.service = service; this.paused = false; this.groups = new Dictionary (); } public bool Paused { get { return this.paused; } set { lock(this) { if(this.paused != value) { this.paused = value; if(!this.paused) { foreach(QueueMonitorGroup group in this.groups.Values) { group.Start(); } } } } } } public BindingsManager Bindings { get { return this.bindings; } } public string ActivationServiceName { get { return this.service.ServiceName; } } public string ProtocolName { get { return this.protocol; } } public IActivatedMessageQueue CreateQueue(ListenerAdapter la, App app) { QueueMonitorGroup qmg = new QueueMonitorGroup(this, la, app); lock (this.thisLock) { this.groups[qmg.ListenerChannelContext.ListenerChannelId] = qmg; } return qmg; } public IActivatedMessageQueue FindQueue(int queueId) { lock (this.thisLock) { QueueMonitorGroup group; this.groups.TryGetValue(queueId, out group); return group; } } public void StopService() { this.service.Stop(); } public void QueueMonitorGroupClosed(QueueMonitorGroup qmg) { lock (this.thisLock) { this.groups.Remove(qmg.ListenerChannelContext.ListenerChannelId); } } } class QueueMonitorGroup : IActivatedMessageQueue { static int queueIdCounter = 0; static readonly TimeSpan RetryMonitorInterval = TimeSpan.FromMinutes(5); ActivationService activationService; App app; ActivationBindingFilter filter; ListenerAdapter listenerAdapter; int startQueueInstanceCount; ListenerChannelContext listenerChannelContext; List monitors = new List (); List failedMonitors = new List (); bool enabled; int pendingNotificationCount; IOThreadTimer retryTimer; bool retryScheduled = false; bool hasStartedQueueInstances; public QueueMonitorGroup(ActivationService activationService, ListenerAdapter la, App app) { this.activationService = activationService; this.listenerAdapter = la; this.app = app; this.startQueueInstanceCount = 1; this.listenerChannelContext = new ListenerChannelContext(app.AppKey, Interlocked.Increment(ref queueIdCounter), Guid.Empty); this.pendingNotificationCount = 0; this.filter = new ActivationBindingFilter(this, app.Path); this.retryTimer = new IOThreadTimer(OnRetryTimer, null, false); } public bool CanDispatch { get { return this.enabled && !this.activationService.Paused; } } public App App { get { return this.app; } } public ListenerChannelContext ListenerChannelContext { get { return this.listenerChannelContext; } } bool IActivatedMessageQueue.HasStartedQueueInstances { get { return this.hasStartedQueueInstances; } } void IActivatedMessageQueue.OnQueueInstancesStopped() { this.hasStartedQueueInstances = false; } public void Delete() { this.activationService.QueueMonitorGroupClosed(this); UnregisterAll(); } public void LaunchQueueInstance() { bool startInstance = false; lock (this) { if (this.pendingNotificationCount > 0) { this.pendingNotificationCount--; startInstance = true; } else { // start monitoring for new messages... startQueueInstanceCount++; // Make sure that everyone is peeking: foreach (QueueMonitor monitor in this.monitors) { monitor.Start(); } } } if (startInstance) { if (this.listenerAdapter.OpenListenerChannelInstance(this)) { this.hasStartedQueueInstances = true; } } } public ListenerExceptionStatus Register(BaseUriWithWildcard url) { this.activationService.Bindings.RegisterBindingFilterIfNecessary(url.BaseAddress.Host, this.filter); return ListenerExceptionStatus.Success; } public void Start() { lock(this) { if(this.CanDispatch) { // Ensure that we're started... foreach(QueueMonitor monitor in this.monitors) { monitor.Start(); } } } } public void SetEnabledState(bool enabled) { lock(this) { if (this.enabled != enabled) { this.enabled = enabled; Start(); } } } public void UnregisterAll() { lock (this) { foreach (QueueMonitor monitor in this.monitors) { monitor.Dispose(); } this.monitors.Clear(); this.activationService.Bindings.UnregisterBindingFilter(this.filter); } } public bool NotifyMessageAvailable() { bool startInstance = false; bool shouldContinue = false; lock (this) { if (!this.CanDispatch) { this.pendingNotificationCount++; } else if (this.startQueueInstanceCount == 0) { this.pendingNotificationCount++; } else { this.startQueueInstanceCount--; startInstance = true; shouldContinue = this.startQueueInstanceCount > 0; } } if (startInstance) { MsmqDiagnostics.StartingApplication(this.app.Path); this.listenerAdapter.OpenListenerChannelInstance(this); this.hasStartedQueueInstances = true; } return shouldContinue; } public void ScheduleRetry(QueueMonitor monitor) { lock (this) { this.failedMonitors.Add(monitor); if (!this.retryScheduled) { this.retryTimer.Set(RetryMonitorInterval); this.retryScheduled = true; } } } object AddQueueToGroup(Uri queue) { QueueMonitor monitor = null; lock (this) { monitor = new QueueMonitor(queue, this); this.monitors.Add(monitor); if (this.enabled) { monitor.Start(); } } return monitor; } void OnRetryTimer(object state) { lock (this) { if (this.enabled) { foreach (QueueMonitor monitor in this.failedMonitors) { // Only start it if we still own it... if (this.monitors.Contains(monitor)) { monitor.Start(); } } } this.failedMonitors.Clear(); } } void RemoveQueueFromGroup(object state) { QueueMonitor monitor = (QueueMonitor)state; lock (this) { this.monitors.Remove(monitor); monitor.Dispose(); } } // Note that we inherit from the transport binding filter here - that's not // a big deal, because we never need these uris to create services. class ActivationBindingFilter : MsmqBindingFilter { QueueMonitorGroup group; public ActivationBindingFilter(QueueMonitorGroup group, string path) : base(path, MsmqUri.NetMsmqAddressTranslator) { this.group = group; } public override object MatchFound(string host, string name, bool isPrivate) { MsmqDiagnostics.MatchedApplicationFound(host, name, isPrivate, this.CanonicalPrefix); return this.group.AddQueueToGroup(CreateServiceUri(host, name, isPrivate)); } public override void MatchLost(string host, string name, bool isPrivate, object callbackState) { this.group.RemoveQueueFromGroup(callbackState); } } } class QueueMonitor : IDisposable { static readonly TimeSpan InfiniteTimeout = TimeSpan.FromMilliseconds(UInt32.MaxValue); bool disposed; QueueMonitorGroup group; bool peeking; string queueName; MQMessageQueue queue; public QueueMonitor(Uri uri, QueueMonitorGroup group) { // The defaults don't really matter here - we don't use // the buffer manager. this.group = group; this.queueName = MsmqFormatName.ToSystemMessagingQueueName(MsmqUri.UriToFormatNameByScheme(uri)); this.peeking = false; Debug.Print("opening queue: " + this.queueName); } public void Start() { lock (this) { try { if (this.queue == null) { this.queue = new MQMessageQueue(this.queueName); this.queue.MessageReadPropertyFilter.ClearAll(); this.queue.MessageReadPropertyFilter.LookupId = true; } if (!this.peeking) { this.peeking = true; this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } } catch (MQException) { this.group.ScheduleRetry(this); } } } public void Dispose() { lock (this) { this.disposed = true; if(this.queue != null) { this.queue.Dispose(); } } } void OnPeekCompleted(IAsyncResult result) { bool shouldContinue = true; try { MQMessage message = this.queue.EndPeek(result); Debug.Print("MsmqActivation.QueueMonitor.OnPeekCompleted: message available"); shouldContinue = this.group.NotifyMessageAvailable(); } catch (MQException ex) { MsmqDiagnostics.CannotPeekOnQueue(this.queue.FormatName, ex); this.group.ScheduleRetry(this); return; } catch (Exception ex) { if (DiagnosticUtility.ShouldTraceError) { DiagnosticUtility.ExceptionUtility.TraceHandledException(ex, TraceEventType.Error); } if (!DiagnosticUtility.IsFatal(ex)) { this.group.ScheduleRetry(this); } throw; } lock (this) { if (!this.disposed && shouldContinue) { this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } else { this.peeking = false; } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- OutputChannelBinder.cs
- AdornerHitTestResult.cs
- OleDbStruct.cs
- StopStoryboard.cs
- IisHelper.cs
- Dictionary.cs
- ToolStripItemRenderEventArgs.cs
- SmtpSpecifiedPickupDirectoryElement.cs
- _IPv4Address.cs
- Underline.cs
- Font.cs
- StandardCommandToolStripMenuItem.cs
- IntellisenseTextBox.cs
- MenuStrip.cs
- DataGridRowHeader.cs
- WebConfigurationHostFileChange.cs
- ResourceReferenceExpression.cs
- ServiceHostingEnvironment.cs
- HelpEvent.cs
- ControlSerializer.cs
- AnimationTimeline.cs
- IriParsingElement.cs
- PreProcessor.cs
- DbReferenceCollection.cs
- QfeChecker.cs
- CdpEqualityComparer.cs
- ProtocolsConfigurationEntry.cs
- KeyManager.cs
- ProgressPage.cs
- SecurityMode.cs
- OperationResponse.cs
- SrgsRuleRef.cs
- PixelShader.cs
- FocusTracker.cs
- DbProviderConfigurationHandler.cs
- ExpandedProjectionNode.cs
- EventToken.cs
- MembershipSection.cs
- ThreadLocal.cs
- DataGridParentRows.cs
- ItemCheckEvent.cs
- Formatter.cs
- PointAnimationUsingKeyFrames.cs
- DataObjectMethodAttribute.cs
- AttachmentCollection.cs
- Timer.cs
- CalendarTable.cs
- TextRenderer.cs
- DataObjectPastingEventArgs.cs
- MemberInitExpression.cs
- HttpCachePolicyElement.cs
- HttpResponseInternalWrapper.cs
- StreamWithDictionary.cs
- BuildProvider.cs
- SafeIUnknown.cs
- RijndaelManagedTransform.cs
- AuthenticatingEventArgs.cs
- ToolStripItemCollection.cs
- RangeValueProviderWrapper.cs
- ComponentChangingEvent.cs
- DynamicDataRoute.cs
- TraceLevelStore.cs
- TextureBrush.cs
- DataRowCollection.cs
- DomNameTable.cs
- ApplicationServiceManager.cs
- LockRecursionException.cs
- SelectionProcessor.cs
- CachingHintValidation.cs
- SqlBulkCopy.cs
- shaper.cs
- RC2.cs
- TypeSchema.cs
- PrimitiveRenderer.cs
- XPathNodeIterator.cs
- SocketException.cs
- DataGridViewBand.cs
- ImpersonationContext.cs
- ResourceReader.cs
- Pair.cs
- WebPartTransformerAttribute.cs
- GeneratedContractType.cs
- ConstructorArgumentAttribute.cs
- ArraySet.cs
- LocalBuilder.cs
- HyperLink.cs
- SmtpFailedRecipientsException.cs
- Util.cs
- DetailsViewPagerRow.cs
- SQLInt32.cs
- WindowsListViewGroup.cs
- Listbox.cs
- MenuItemStyle.cs
- KoreanLunisolarCalendar.cs
- Processor.cs
- VoiceChangeEventArgs.cs
- OutOfMemoryException.cs
- HttpCacheParams.cs
- MetadataItem.cs
- LexicalChunk.cs