Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / Msmq4PoisonHandler.cs / 1 / Msmq4PoisonHandler.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.IO; using System.Threading; using System.Transactions; sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy { MsmqQueue mainQueue; MsmqQueue mainQueueForMove; MsmqQueue retryQueueForPeek; MsmqQueue retryQueueForMove; MsmqQueue poisonQueue; IOThreadTimer timer; MsmqReceiveHelper receiver; bool disposed; string poisonQueueName; string retryQueueName; string mainQueueName; MsmqRetryQueueMessage retryQueueMessage; static WaitCallback onStartPeek = new WaitCallback(StartPeek); static AsyncCallback onPeekCompleted = DiagnosticUtility.ThunkAsyncCallback(OnPeekCompleted); public Msmq4PoisonHandler(MsmqReceiveHelper receiver) { this.receiver = receiver; this.timer = new IOThreadTimer(OnTimer, null, false); this.disposed = false; this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri); this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison")); this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry")); } MsmqReceiveParameters ReceiveParameters { get { return this.receiver.MsmqReceiveParameters; } } Uri ListenUri { get { return this.receiver.ListenUri; } } public void Open() { this.mainQueue = this.receiver.Queue; this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); // Open up the poison queue (for handling poison messages). this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.retryQueueMessage = new MsmqRetryQueueMessage(); if (Thread.CurrentThread.IsThreadPoolThread) StartPeek(this); else IOThreadScheduler.ScheduleCallback(Msmq4PoisonHandler.onStartPeek, this); } static void StartPeek(object state) { Msmq4PoisonHandler handler = state as Msmq4PoisonHandler; lock(handler) { if(! handler.disposed) { handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler); } } } public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty) { if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount) return false; int retryCycle = messageProperty.MoveCount / 2; lock(this) { if (this.disposed) throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString())); if (retryCycle >= this.ReceiveParameters.MaxRetryCycles) { FinalDisposition(messageProperty); } else { MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId); } } return true; } public void FinalDisposition(MsmqMessageProperty messageProperty) { switch (this.ReceiveParameters.ReceiveErrorHandling) { case ReceiveErrorHandling.Drop: this.receiver.DropOrRejectReceivedMessage(messageProperty, false); break; case ReceiveErrorHandling.Fault: MsmqReceiveHelper.TryAbortTransactionCurrent(); if (null != this.receiver.ChannelListener) this.receiver.ChannelListener.FaultListener(); if (null != this.receiver.Channel) this.receiver.Channel.FaultChannel(); break; case ReceiveErrorHandling.Reject: this.receiver.DropOrRejectReceivedMessage(messageProperty, true); MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId); break; case ReceiveErrorHandling.Move: MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.poisonQueue, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId); break; default: DiagnosticUtility.DebugAssert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)"); break; } } public void Dispose() { lock(this) { if(!this.disposed) { this.disposed = true; this.timer.Cancel(); if (null != this.retryQueueForPeek) this.retryQueueForPeek.Dispose(); if (null != this.retryQueueForMove) this.retryQueueForMove.Dispose(); if (null != this.poisonQueue) this.poisonQueue.Dispose(); if (null != this.mainQueueForMove) this.mainQueueForMove.Dispose(); } } } static void OnPeekCompleted(IAsyncResult result) { Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler; MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; try { receiveResult = handler.retryQueueForPeek.EndPeek(result); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult) { lock(handler) { if(!handler.disposed) { // Check the time - move it, and begin peeking again // if necessary, or wait for the timeout. DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value); TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow; if (waitTime < TimeSpan.Zero) handler.OnTimer(handler); else handler.timer.Set(waitTime); } } } } void OnTimer(object state) { lock(this) { if(!this.disposed) { try { this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this); } } } class MsmqRetryQueueMessage : NativeMsmqMessage { LongProperty lookupId; IntProperty lastMoveTime; public MsmqRetryQueueMessage() : base(2) { this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID); this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME); } public LongProperty LookupId { get { return this.lookupId; } } public IntProperty LastMoveTime { get { return this.lastMoveTime; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- MailDefinition.cs
- DataObjectFieldAttribute.cs
- UndirectedGraph.cs
- RemoteWebConfigurationHost.cs
- AssemblyGen.cs
- XmlAnyElementAttributes.cs
- AuthenticationModuleElementCollection.cs
- Rectangle.cs
- JsonServiceDocumentSerializer.cs
- SamlDelegatingWriter.cs
- MarginsConverter.cs
- MappingModelBuildProvider.cs
- MouseCaptureWithinProperty.cs
- XmlBinaryReaderSession.cs
- odbcmetadatafactory.cs
- SevenBitStream.cs
- ActiveXContainer.cs
- WebBrowserSiteBase.cs
- AsyncStreamReader.cs
- DescendantOverDescendantQuery.cs
- WebBrowserPermission.cs
- SiteMapHierarchicalDataSourceView.cs
- BooleanAnimationUsingKeyFrames.cs
- MetaType.cs
- TextRunProperties.cs
- OdbcConnectionString.cs
- DataGridViewCheckBoxCell.cs
- ConsoleTraceListener.cs
- __TransparentProxy.cs
- TextChange.cs
- QueryOptionExpression.cs
- SortedDictionary.cs
- EventSinkActivity.cs
- SimpleType.cs
- MSAAWinEventWrap.cs
- MediaElement.cs
- HostDesigntimeLicenseContext.cs
- RowParagraph.cs
- PointCollection.cs
- UInt64Storage.cs
- DependencyObjectPropertyDescriptor.cs
- DataGridAutoFormat.cs
- StorageFunctionMapping.cs
- MaxSessionCountExceededException.cs
- List.cs
- DataStreamFromComStream.cs
- TextEditorSelection.cs
- CancelEventArgs.cs
- SmuggledIUnknown.cs
- ResourceDisplayNameAttribute.cs
- DataGridViewCheckBoxCell.cs
- CheckBoxStandardAdapter.cs
- ObjectCache.cs
- _KerberosClient.cs
- Vector3DAnimationBase.cs
- ConsoleEntryPoint.cs
- CommonGetThemePartSize.cs
- webclient.cs
- XmlMemberMapping.cs
- Clipboard.cs
- ProtocolsConfigurationHandler.cs
- EditorZone.cs
- TypeContext.cs
- EventSourceCreationData.cs
- ColorAnimationUsingKeyFrames.cs
- Point3DValueSerializer.cs
- RawStylusSystemGestureInputReport.cs
- Selection.cs
- LambdaCompiler.Binary.cs
- UnsafeNativeMethods.cs
- BrushMappingModeValidation.cs
- TextCharacters.cs
- SerializationInfoEnumerator.cs
- RawUIStateInputReport.cs
- AbandonedMutexException.cs
- UriSectionData.cs
- SqlReferenceCollection.cs
- CompilerLocalReference.cs
- OledbConnectionStringbuilder.cs
- NameValuePair.cs
- RSAPKCS1SignatureDeformatter.cs
- Animatable.cs
- ToolStripRenderEventArgs.cs
- ConstantSlot.cs
- RuleCache.cs
- VolatileEnlistmentMultiplexing.cs
- Timer.cs
- ScrollBar.cs
- StyleHelper.cs
- DataFormats.cs
- XmlSchemaSimpleType.cs
- MenuAdapter.cs
- LostFocusEventManager.cs
- HighlightComponent.cs
- BinHexDecoder.cs
- Literal.cs
- CompositeActivityDesigner.cs
- JournalNavigationScope.cs
- EntityDataSourceDataSelectionPanel.cs
- DoWorkEventArgs.cs