Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / Msmq4PoisonHandler.cs / 1 / Msmq4PoisonHandler.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.IO; using System.Threading; using System.Transactions; sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy { MsmqQueue mainQueue; MsmqQueue mainQueueForMove; MsmqQueue retryQueueForPeek; MsmqQueue retryQueueForMove; MsmqQueue poisonQueue; IOThreadTimer timer; MsmqReceiveHelper receiver; bool disposed; string poisonQueueName; string retryQueueName; string mainQueueName; MsmqRetryQueueMessage retryQueueMessage; static WaitCallback onStartPeek = new WaitCallback(StartPeek); static AsyncCallback onPeekCompleted = DiagnosticUtility.ThunkAsyncCallback(OnPeekCompleted); public Msmq4PoisonHandler(MsmqReceiveHelper receiver) { this.receiver = receiver; this.timer = new IOThreadTimer(OnTimer, null, false); this.disposed = false; this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri); this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison")); this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry")); } MsmqReceiveParameters ReceiveParameters { get { return this.receiver.MsmqReceiveParameters; } } Uri ListenUri { get { return this.receiver.ListenUri; } } public void Open() { this.mainQueue = this.receiver.Queue; this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); // Open up the poison queue (for handling poison messages). this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.retryQueueMessage = new MsmqRetryQueueMessage(); if (Thread.CurrentThread.IsThreadPoolThread) StartPeek(this); else IOThreadScheduler.ScheduleCallback(Msmq4PoisonHandler.onStartPeek, this); } static void StartPeek(object state) { Msmq4PoisonHandler handler = state as Msmq4PoisonHandler; lock(handler) { if(! handler.disposed) { handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler); } } } public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty) { if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount) return false; int retryCycle = messageProperty.MoveCount / 2; lock(this) { if (this.disposed) throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString())); if (retryCycle >= this.ReceiveParameters.MaxRetryCycles) { FinalDisposition(messageProperty); } else { MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId); } } return true; } public void FinalDisposition(MsmqMessageProperty messageProperty) { switch (this.ReceiveParameters.ReceiveErrorHandling) { case ReceiveErrorHandling.Drop: this.receiver.DropOrRejectReceivedMessage(messageProperty, false); break; case ReceiveErrorHandling.Fault: MsmqReceiveHelper.TryAbortTransactionCurrent(); if (null != this.receiver.ChannelListener) this.receiver.ChannelListener.FaultListener(); if (null != this.receiver.Channel) this.receiver.Channel.FaultChannel(); break; case ReceiveErrorHandling.Reject: this.receiver.DropOrRejectReceivedMessage(messageProperty, true); MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId); break; case ReceiveErrorHandling.Move: MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.poisonQueue, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId); break; default: DiagnosticUtility.DebugAssert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)"); break; } } public void Dispose() { lock(this) { if(!this.disposed) { this.disposed = true; this.timer.Cancel(); if (null != this.retryQueueForPeek) this.retryQueueForPeek.Dispose(); if (null != this.retryQueueForMove) this.retryQueueForMove.Dispose(); if (null != this.poisonQueue) this.poisonQueue.Dispose(); if (null != this.mainQueueForMove) this.mainQueueForMove.Dispose(); } } } static void OnPeekCompleted(IAsyncResult result) { Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler; MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; try { receiveResult = handler.retryQueueForPeek.EndPeek(result); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult) { lock(handler) { if(!handler.disposed) { // Check the time - move it, and begin peeking again // if necessary, or wait for the timeout. DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value); TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow; if (waitTime < TimeSpan.Zero) handler.OnTimer(handler); else handler.timer.Set(waitTime); } } } } void OnTimer(object state) { lock(this) { if(!this.disposed) { try { this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this); } } } class MsmqRetryQueueMessage : NativeMsmqMessage { LongProperty lookupId; IntProperty lastMoveTime; public MsmqRetryQueueMessage() : base(2) { this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID); this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME); } public LongProperty LookupId { get { return this.lookupId; } } public IntProperty LastMoveTime { get { return this.lastMoveTime; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Point4DConverter.cs
- ConnectionStringSettings.cs
- SpeakProgressEventArgs.cs
- AccessViolationException.cs
- XmlComplianceUtil.cs
- IPAddressCollection.cs
- HealthMonitoringSectionHelper.cs
- ClientEventManager.cs
- QueryContinueDragEventArgs.cs
- BidirectionalDictionary.cs
- XamlFilter.cs
- BaseDataBoundControl.cs
- ObjectListDataBindEventArgs.cs
- ConnectionStringsSection.cs
- JavaScriptObjectDeserializer.cs
- HashAlgorithm.cs
- NonSerializedAttribute.cs
- ProviderIncompatibleException.cs
- EventLog.cs
- MgmtResManager.cs
- EntityReference.cs
- Canvas.cs
- Point3DAnimationUsingKeyFrames.cs
- SharedPerformanceCounter.cs
- IteratorFilter.cs
- FontNameConverter.cs
- ObjectKeyFrameCollection.cs
- ArglessEventHandlerProxy.cs
- ObjectResult.cs
- DefaultCommandConverter.cs
- ThrowHelper.cs
- AxWrapperGen.cs
- MimePart.cs
- TextBox.cs
- GeneralTransform3DTo2D.cs
- DataTableCollection.cs
- ZipFileInfoCollection.cs
- RowCache.cs
- MimeXmlReflector.cs
- DesignerRegionCollection.cs
- XmlAggregates.cs
- GenericUriParser.cs
- InstalledVoice.cs
- AuthenticationSchemesHelper.cs
- NamespaceQuery.cs
- ObjectItemNoOpAssemblyLoader.cs
- WinFormsSecurity.cs
- LockedHandleGlyph.cs
- SmiEventSink_DeferedProcessing.cs
- thaishape.cs
- WebPartConnectionsCloseVerb.cs
- ContextStack.cs
- ServiceAuthorizationElement.cs
- MatrixUtil.cs
- Margins.cs
- MouseButton.cs
- OSFeature.cs
- SqlStream.cs
- ImageEditor.cs
- HtmlInputText.cs
- CodeDelegateCreateExpression.cs
- BitmapEffectGroup.cs
- SqlReferenceCollection.cs
- PrinterUnitConvert.cs
- ConstructorBuilder.cs
- Point.cs
- dtdvalidator.cs
- BindingGroup.cs
- ProcessModule.cs
- Misc.cs
- DataGridViewCellMouseEventArgs.cs
- DependencyProperty.cs
- OdbcReferenceCollection.cs
- IdlingCommunicationPool.cs
- ButtonFieldBase.cs
- XmlNamespaceMappingCollection.cs
- WebControlAdapter.cs
- APCustomTypeDescriptor.cs
- HttpRequest.cs
- ProviderCommandInfoUtils.cs
- SoundPlayer.cs
- PcmConverter.cs
- EntityCommandDefinition.cs
- TypeLoadException.cs
- ComponentCollection.cs
- COM2IDispatchConverter.cs
- ToolboxComponentsCreatedEventArgs.cs
- JsonXmlDataContract.cs
- ContainerParagraph.cs
- ToggleButtonAutomationPeer.cs
- NamespaceDecl.cs
- TaiwanCalendar.cs
- Axis.cs
- RegexMatchCollection.cs
- ColorAnimationBase.cs
- Message.cs
- UriTemplateTrieNode.cs
- PaperSource.cs
- Codec.cs
- Preprocessor.cs