Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / MsmqReceiveHelper.cs / 1 / MsmqReceiveHelper.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.ServiceModel.Diagnostics; using System.Threading; using System.Transactions; sealed class MsmqReceiveHelper { IPoisonHandlingStrategy poisonHandler; string queueName; MsmqQueue queue; MsmqReceiveParameters receiveParameters; Uri uri; string instanceId; IMsmqMessagePool pool; MsmqInputChannelBase channel; MsmqChannelListenerBase listener; ServiceModelActivity activity; internal MsmqReceiveHelper(MsmqReceiveParameters receiveParameters, Uri uri, IMsmqMessagePool messagePool, MsmqInputChannelBase channel, MsmqChannelListenerBase listener) { this.queueName = receiveParameters.AddressTranslator.UriToFormatName(uri); this.receiveParameters = receiveParameters; this.uri = uri; this.queue = new MsmqQueue(this.receiveParameters.AddressTranslator.UriToFormatName(uri), UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.instanceId = uri.ToString().ToUpperInvariant(); this.pool = messagePool; this.poisonHandler = Msmq.CreatePoisonHandler(this); this.channel = channel; this.listener = listener; } internal ServiceModelActivity Activity { get { return this.activity; } } IPoisonHandlingStrategy PoisonHandler { get { return this.poisonHandler; } } internal MsmqReceiveParameters MsmqReceiveParameters { get { return this.receiveParameters; } } internal MsmqInputChannelBase Channel { get { return this.channel; } } internal MsmqChannelListenerBase ChannelListener { get { return this.listener; } } internal Uri ListenUri { get { return this.uri; } } internal string InstanceId { get { return this.instanceId; } } internal MsmqQueue Queue { get { return this.queue; } } internal bool Transactional { get { return this.receiveParameters.ExactlyOnce; } } internal void Open() { this.activity = MsmqDiagnostics.StartListenAtActivity(this); using (MsmqDiagnostics.BoundOpenOperation(this)) { this.queue.EnsureOpen(); this.poisonHandler.Open(); } } internal void Close() { using (ServiceModelActivity.BoundOperation(this.Activity)) { this.poisonHandler.Dispose(); this.queue.Dispose(); } ServiceModelActivity.Stop(this.activity); } internal MsmqInputMessage TakeMessage() { return this.pool.TakeMessage(); } internal void ReturnMessage(MsmqInputMessage message) { this.pool.ReturnMessage(message); } internal static void TryAbortTransactionCurrent() { if (null != Transaction.Current) { try { Transaction.Current.Rollback(); } catch (TransactionAbortedException ex) { MsmqDiagnostics.ExpectedException(ex); } catch (ObjectDisposedException ex) { MsmqDiagnostics.ExpectedException(ex); } } } internal void DropOrRejectReceivedMessage(MsmqMessageProperty messageProperty, bool reject) { if (this.Transactional) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(messageProperty.LookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { using (MsmqEmptyMessage emptyMessage = new MsmqEmptyMessage()) { using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { result = this.Queue.TryReceiveByLookupId(messageProperty.LookupId, emptyMessage, MsmqTransactionMode.CurrentOrThrow); if (MsmqQueue.MoveReceiveResult.Succeeded == result && reject) this.Queue.MarkMessageRejected(messageProperty.LookupId); scope.Complete(); } } if (result == MsmqQueue.MoveReceiveResult.Succeeded) MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, Msmq.IsRejectMessageSupported); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } else { MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, false); } } // internal static void MoveReceivedMessage(MsmqQueue queueFrom, MsmqQueue queueTo, long lookupId) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(lookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { result = queueFrom.TryMoveMessage(lookupId, queueTo, MsmqTransactionMode.Single); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } internal void FinalDisposition(MsmqMessageProperty messageProperty) { this.poisonHandler.FinalDisposition(messageProperty); } // WaitForMessage internal bool WaitForMessage(TimeSpan timeout) { using (MsmqEmptyMessage message = new MsmqEmptyMessage()) { return (MsmqQueue.ReceiveResult.Timeout != this.queue.TryPeek(message, timeout)); } } // internal IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return new WaitForMessageAsyncResult(this.queue, timeout, callback, state); } // public bool EndWaitForMessage(IAsyncResult result) { return WaitForMessageAsyncResult.End(result); } internal bool TryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, out MsmqMessageProperty property) { property = null; MsmqQueue.ReceiveResult receiveResult = this.Queue.TryReceive(msmqMessage, timeout, transactionMode); if (MsmqQueue.ReceiveResult.OperationCancelled == receiveResult) return true; if (MsmqQueue.ReceiveResult.Timeout == receiveResult) return false; else { property = new MsmqMessageProperty(msmqMessage); if (this.Transactional) { if (this.PoisonHandler.CheckAndHandlePoisonMessage(property)) { long lookupId = property.LookupId; property = null; throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new MsmqPoisonMessageException(lookupId)); } } return true; } } // internal IAsyncResult BeginTryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) { if (this.receiveParameters.ExactlyOnce) return new TryTransactedReceiveAsyncResult(this, msmqMessage, timeout, transactionMode, callback, state); else return new TryNonTransactedReceiveAsyncResult(this, msmqMessage, timeout, callback, state); } // internal bool EndTryReceive(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty msmqProperty) { msmqMessage = null; msmqProperty = null; if (null == result) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result"); if (this.receiveParameters.ExactlyOnce) { TryTransactedReceiveAsyncResult receiveResult = result as TryTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } else { TryNonTransactedReceiveAsyncResult receiveResult = result as TryNonTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryNonTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } } // TryReceiveAsyncResult (tx version) class TryTransactedReceiveAsyncResult : AsyncResult { bool expired; MsmqReceiveHelper receiver; TimeoutHelper timeoutHelper; Transaction txCurrent; MsmqInputMessage msmqMessage; MsmqMessageProperty messageProperty; MsmqTransactionMode transactionMode; static WaitCallback onComplete = OnComplete; internal TryTransactedReceiveAsyncResult( MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) : base(callback, state) { this.timeoutHelper = new TimeoutHelper(timeout); this.txCurrent = Transaction.Current; this.receiver = receiver; this.msmqMessage = msmqMessage; this.transactionMode = transactionMode; IOThreadScheduler.ScheduleCallback(onComplete, this); } static void OnComplete(object parameter) { TryTransactedReceiveAsyncResult result = parameter as TryTransactedReceiveAsyncResult; Transaction savedTransaction = Transaction.Current; Transaction.Current = result.txCurrent; try { Exception ex = null; try { result.expired = ! result.receiver.TryReceive(result.msmqMessage, result.timeoutHelper.RemainingTime(), result.transactionMode, out result.messageProperty); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } result.Complete(false, ex); } finally { Transaction.Current = savedTransaction; } } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryTransactedReceiveAsyncResult receiveResult = AsyncResult.End(result); msmqMessage = receiveResult.msmqMessage; property = receiveResult.messageProperty; return ! receiveResult.expired; } } // TryReceiveAsyncResult (non-tx version) class TryNonTransactedReceiveAsyncResult : AsyncResult { MsmqQueue.ReceiveResult receiveResult; MsmqReceiveHelper receiver; MsmqInputMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); internal TryNonTransactedReceiveAsyncResult(MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.receiver = receiver; this.msmqMessage = msmqMessage; receiver.Queue.BeginTryReceive(msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { (result.AsyncState as TryNonTransactedReceiveAsyncResult).OnComplete(result); } void OnComplete(IAsyncResult result) { Exception ex = null; try { receiveResult = receiver.Queue.EndTryReceive(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } Complete(result.CompletedSynchronously, ex); } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryNonTransactedReceiveAsyncResult asyncResult = AsyncResult.End (result); msmqMessage = asyncResult.msmqMessage; property = null; if (MsmqQueue.ReceiveResult.Timeout == asyncResult.receiveResult) return false; else if (MsmqQueue.ReceiveResult.OperationCancelled == asyncResult.receiveResult) return true; else { property = new MsmqMessageProperty(msmqMessage); return true; } } } // WaitForMessageAsyncResult class WaitForMessageAsyncResult : TypedAsyncResult { MsmqQueue msmqQueue; MsmqEmptyMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); public WaitForMessageAsyncResult(MsmqQueue msmqQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.msmqMessage = new MsmqEmptyMessage(); this.msmqQueue = msmqQueue; this.msmqQueue.BeginPeek(this.msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { ((WaitForMessageAsyncResult)result.AsyncState).OnComplete(result); } void OnComplete(IAsyncResult result) { this.msmqMessage.Dispose(); MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; Exception ex = null; try { receiveResult = this.msmqQueue.EndPeek(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } if (null == ex) Complete(receiveResult != MsmqQueue.ReceiveResult.Timeout, result.CompletedSynchronously); else Complete(result.CompletedSynchronously, ex); } } // PostRollbackErrorStrategy interface IPostRollbackErrorStrategy { bool AnotherTryNeeded(); } // SimplePostRollbackErrorStrategy class SimplePostRollbackErrorStrategy : IPostRollbackErrorStrategy { const int Attempts = 50; const int MillisecondsToSleep = 100; int attemptsLeft = Attempts; long lookupId; internal SimplePostRollbackErrorStrategy(long lookupId) { this.lookupId = lookupId; } bool IPostRollbackErrorStrategy.AnotherTryNeeded() { if (--this.attemptsLeft > 0) { if (attemptsLeft == (Attempts - 1)) MsmqDiagnostics.MessageLockedUnderTheTransaction(lookupId); Thread.Sleep(TimeSpan.FromMilliseconds(MillisecondsToSleep)); return true; } else { MsmqDiagnostics.MoveOrDeleteAttemptFailed(lookupId); return false; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SqlCaseSimplifier.cs
- DesignerAttributeInfo.cs
- PropertyTab.cs
- SmiTypedGetterSetter.cs
- ExtensibleClassFactory.cs
- UnsafeNativeMethods.cs
- AspNetCacheProfileAttribute.cs
- Lookup.cs
- LineGeometry.cs
- PathFigureCollection.cs
- AddingNewEventArgs.cs
- SafeRsaProviderHandle.cs
- InkPresenterAutomationPeer.cs
- TreeBuilderXamlTranslator.cs
- XPathScanner.cs
- ListControl.cs
- CompositeFontFamily.cs
- ProfileParameter.cs
- SyndicationDeserializer.cs
- LightweightEntityWrapper.cs
- DesignRelation.cs
- MultiBindingExpression.cs
- WorkflowTimerService.cs
- ValueExpressions.cs
- ClientRolePrincipal.cs
- SafeNativeMethodsMilCoreApi.cs
- PermissionSetTriple.cs
- CheckBoxStandardAdapter.cs
- PageThemeCodeDomTreeGenerator.cs
- HostSecurityManager.cs
- InvalidCardException.cs
- NavigationService.cs
- MethodRental.cs
- PreviewPrintController.cs
- DataGridViewLinkCell.cs
- HtmlSelect.cs
- ProtocolsConfiguration.cs
- LineUtil.cs
- PolicyValidator.cs
- ImageBrush.cs
- LocatorPart.cs
- StringUtil.cs
- MulticastNotSupportedException.cs
- NeutralResourcesLanguageAttribute.cs
- ProfileGroupSettings.cs
- HashCodeCombiner.cs
- OverflowException.cs
- TreeNode.cs
- CommandLibraryHelper.cs
- NumberSubstitution.cs
- WebRequestModuleElementCollection.cs
- RequestStatusBarUpdateEventArgs.cs
- ValidateNames.cs
- TypeSystemHelpers.cs
- SplitterPanel.cs
- MetafileHeader.cs
- ProfileBuildProvider.cs
- EqualityArray.cs
- ScriptManager.cs
- DataGridLength.cs
- WebPermission.cs
- XmlSortKey.cs
- RelationalExpressions.cs
- ScriptMethodAttribute.cs
- InputScopeConverter.cs
- ResourceDefaultValueAttribute.cs
- CollectionBuilder.cs
- ServiceDescription.cs
- GridViewCancelEditEventArgs.cs
- ConfigViewGenerator.cs
- HttpCacheVary.cs
- ApplicationSettingsBase.cs
- PageRouteHandler.cs
- SchemaImporterExtensionElementCollection.cs
- SystemTcpStatistics.cs
- SchemaName.cs
- FilterQueryOptionExpression.cs
- HeaderCollection.cs
- MemberCollection.cs
- ScriptingSectionGroup.cs
- SqlEnums.cs
- columnmapkeybuilder.cs
- ConnectionPointCookie.cs
- RewritingProcessor.cs
- IxmlLineInfo.cs
- WebPart.cs
- ZipIORawDataFileBlock.cs
- DependencyPropertyChangedEventArgs.cs
- CompiledQuery.cs
- ImageDrawing.cs
- SynchronizedInputHelper.cs
- ListViewItem.cs
- EndpointIdentity.cs
- EntityDesignerUtils.cs
- SpellCheck.cs
- Relationship.cs
- XmlWriter.cs
- AddInProcess.cs
- _HelperAsyncResults.cs
- FixedDSBuilder.cs