Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / MsmqReceiveHelper.cs / 1 / MsmqReceiveHelper.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.ServiceModel.Diagnostics; using System.Threading; using System.Transactions; sealed class MsmqReceiveHelper { IPoisonHandlingStrategy poisonHandler; string queueName; MsmqQueue queue; MsmqReceiveParameters receiveParameters; Uri uri; string instanceId; IMsmqMessagePool pool; MsmqInputChannelBase channel; MsmqChannelListenerBase listener; ServiceModelActivity activity; internal MsmqReceiveHelper(MsmqReceiveParameters receiveParameters, Uri uri, IMsmqMessagePool messagePool, MsmqInputChannelBase channel, MsmqChannelListenerBase listener) { this.queueName = receiveParameters.AddressTranslator.UriToFormatName(uri); this.receiveParameters = receiveParameters; this.uri = uri; this.queue = new MsmqQueue(this.receiveParameters.AddressTranslator.UriToFormatName(uri), UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.instanceId = uri.ToString().ToUpperInvariant(); this.pool = messagePool; this.poisonHandler = Msmq.CreatePoisonHandler(this); this.channel = channel; this.listener = listener; } internal ServiceModelActivity Activity { get { return this.activity; } } IPoisonHandlingStrategy PoisonHandler { get { return this.poisonHandler; } } internal MsmqReceiveParameters MsmqReceiveParameters { get { return this.receiveParameters; } } internal MsmqInputChannelBase Channel { get { return this.channel; } } internal MsmqChannelListenerBase ChannelListener { get { return this.listener; } } internal Uri ListenUri { get { return this.uri; } } internal string InstanceId { get { return this.instanceId; } } internal MsmqQueue Queue { get { return this.queue; } } internal bool Transactional { get { return this.receiveParameters.ExactlyOnce; } } internal void Open() { this.activity = MsmqDiagnostics.StartListenAtActivity(this); using (MsmqDiagnostics.BoundOpenOperation(this)) { this.queue.EnsureOpen(); this.poisonHandler.Open(); } } internal void Close() { using (ServiceModelActivity.BoundOperation(this.Activity)) { this.poisonHandler.Dispose(); this.queue.Dispose(); } ServiceModelActivity.Stop(this.activity); } internal MsmqInputMessage TakeMessage() { return this.pool.TakeMessage(); } internal void ReturnMessage(MsmqInputMessage message) { this.pool.ReturnMessage(message); } internal static void TryAbortTransactionCurrent() { if (null != Transaction.Current) { try { Transaction.Current.Rollback(); } catch (TransactionAbortedException ex) { MsmqDiagnostics.ExpectedException(ex); } catch (ObjectDisposedException ex) { MsmqDiagnostics.ExpectedException(ex); } } } internal void DropOrRejectReceivedMessage(MsmqMessageProperty messageProperty, bool reject) { if (this.Transactional) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(messageProperty.LookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { using (MsmqEmptyMessage emptyMessage = new MsmqEmptyMessage()) { using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { result = this.Queue.TryReceiveByLookupId(messageProperty.LookupId, emptyMessage, MsmqTransactionMode.CurrentOrThrow); if (MsmqQueue.MoveReceiveResult.Succeeded == result && reject) this.Queue.MarkMessageRejected(messageProperty.LookupId); scope.Complete(); } } if (result == MsmqQueue.MoveReceiveResult.Succeeded) MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, Msmq.IsRejectMessageSupported); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } else { MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, false); } } // internal static void MoveReceivedMessage(MsmqQueue queueFrom, MsmqQueue queueTo, long lookupId) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(lookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { result = queueFrom.TryMoveMessage(lookupId, queueTo, MsmqTransactionMode.Single); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } internal void FinalDisposition(MsmqMessageProperty messageProperty) { this.poisonHandler.FinalDisposition(messageProperty); } // WaitForMessage internal bool WaitForMessage(TimeSpan timeout) { using (MsmqEmptyMessage message = new MsmqEmptyMessage()) { return (MsmqQueue.ReceiveResult.Timeout != this.queue.TryPeek(message, timeout)); } } // internal IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return new WaitForMessageAsyncResult(this.queue, timeout, callback, state); } // public bool EndWaitForMessage(IAsyncResult result) { return WaitForMessageAsyncResult.End(result); } internal bool TryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, out MsmqMessageProperty property) { property = null; MsmqQueue.ReceiveResult receiveResult = this.Queue.TryReceive(msmqMessage, timeout, transactionMode); if (MsmqQueue.ReceiveResult.OperationCancelled == receiveResult) return true; if (MsmqQueue.ReceiveResult.Timeout == receiveResult) return false; else { property = new MsmqMessageProperty(msmqMessage); if (this.Transactional) { if (this.PoisonHandler.CheckAndHandlePoisonMessage(property)) { long lookupId = property.LookupId; property = null; throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new MsmqPoisonMessageException(lookupId)); } } return true; } } // internal IAsyncResult BeginTryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) { if (this.receiveParameters.ExactlyOnce) return new TryTransactedReceiveAsyncResult(this, msmqMessage, timeout, transactionMode, callback, state); else return new TryNonTransactedReceiveAsyncResult(this, msmqMessage, timeout, callback, state); } // internal bool EndTryReceive(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty msmqProperty) { msmqMessage = null; msmqProperty = null; if (null == result) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result"); if (this.receiveParameters.ExactlyOnce) { TryTransactedReceiveAsyncResult receiveResult = result as TryTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } else { TryNonTransactedReceiveAsyncResult receiveResult = result as TryNonTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryNonTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } } // TryReceiveAsyncResult (tx version) class TryTransactedReceiveAsyncResult : AsyncResult { bool expired; MsmqReceiveHelper receiver; TimeoutHelper timeoutHelper; Transaction txCurrent; MsmqInputMessage msmqMessage; MsmqMessageProperty messageProperty; MsmqTransactionMode transactionMode; static WaitCallback onComplete = OnComplete; internal TryTransactedReceiveAsyncResult( MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) : base(callback, state) { this.timeoutHelper = new TimeoutHelper(timeout); this.txCurrent = Transaction.Current; this.receiver = receiver; this.msmqMessage = msmqMessage; this.transactionMode = transactionMode; IOThreadScheduler.ScheduleCallback(onComplete, this); } static void OnComplete(object parameter) { TryTransactedReceiveAsyncResult result = parameter as TryTransactedReceiveAsyncResult; Transaction savedTransaction = Transaction.Current; Transaction.Current = result.txCurrent; try { Exception ex = null; try { result.expired = ! result.receiver.TryReceive(result.msmqMessage, result.timeoutHelper.RemainingTime(), result.transactionMode, out result.messageProperty); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } result.Complete(false, ex); } finally { Transaction.Current = savedTransaction; } } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryTransactedReceiveAsyncResult receiveResult = AsyncResult.End(result); msmqMessage = receiveResult.msmqMessage; property = receiveResult.messageProperty; return ! receiveResult.expired; } } // TryReceiveAsyncResult (non-tx version) class TryNonTransactedReceiveAsyncResult : AsyncResult { MsmqQueue.ReceiveResult receiveResult; MsmqReceiveHelper receiver; MsmqInputMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); internal TryNonTransactedReceiveAsyncResult(MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.receiver = receiver; this.msmqMessage = msmqMessage; receiver.Queue.BeginTryReceive(msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { (result.AsyncState as TryNonTransactedReceiveAsyncResult).OnComplete(result); } void OnComplete(IAsyncResult result) { Exception ex = null; try { receiveResult = receiver.Queue.EndTryReceive(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } Complete(result.CompletedSynchronously, ex); } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryNonTransactedReceiveAsyncResult asyncResult = AsyncResult.End (result); msmqMessage = asyncResult.msmqMessage; property = null; if (MsmqQueue.ReceiveResult.Timeout == asyncResult.receiveResult) return false; else if (MsmqQueue.ReceiveResult.OperationCancelled == asyncResult.receiveResult) return true; else { property = new MsmqMessageProperty(msmqMessage); return true; } } } // WaitForMessageAsyncResult class WaitForMessageAsyncResult : TypedAsyncResult { MsmqQueue msmqQueue; MsmqEmptyMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); public WaitForMessageAsyncResult(MsmqQueue msmqQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.msmqMessage = new MsmqEmptyMessage(); this.msmqQueue = msmqQueue; this.msmqQueue.BeginPeek(this.msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { ((WaitForMessageAsyncResult)result.AsyncState).OnComplete(result); } void OnComplete(IAsyncResult result) { this.msmqMessage.Dispose(); MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; Exception ex = null; try { receiveResult = this.msmqQueue.EndPeek(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } if (null == ex) Complete(receiveResult != MsmqQueue.ReceiveResult.Timeout, result.CompletedSynchronously); else Complete(result.CompletedSynchronously, ex); } } // PostRollbackErrorStrategy interface IPostRollbackErrorStrategy { bool AnotherTryNeeded(); } // SimplePostRollbackErrorStrategy class SimplePostRollbackErrorStrategy : IPostRollbackErrorStrategy { const int Attempts = 50; const int MillisecondsToSleep = 100; int attemptsLeft = Attempts; long lookupId; internal SimplePostRollbackErrorStrategy(long lookupId) { this.lookupId = lookupId; } bool IPostRollbackErrorStrategy.AnotherTryNeeded() { if (--this.attemptsLeft > 0) { if (attemptsLeft == (Attempts - 1)) MsmqDiagnostics.MessageLockedUnderTheTransaction(lookupId); Thread.Sleep(TimeSpan.FromMilliseconds(MillisecondsToSleep)); return true; } else { MsmqDiagnostics.MoveOrDeleteAttemptFailed(lookupId); return false; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TimeZone.cs
- StateMachineHelpers.cs
- XhtmlBasicImageAdapter.cs
- DynamicControl.cs
- HttpListener.cs
- CalculatedColumn.cs
- WindowsSolidBrush.cs
- _NegoStream.cs
- SystemMulticastIPAddressInformation.cs
- UInt16Converter.cs
- BoundField.cs
- HtmlElement.cs
- DataExpression.cs
- ObjectList.cs
- ComPlusInstanceContextInitializer.cs
- ACE.cs
- NamedPermissionSet.cs
- BamlBinaryWriter.cs
- OdbcFactory.cs
- DelegatedStream.cs
- EmptyStringExpandableObjectConverter.cs
- InvokeCompletedEventArgs.cs
- IgnoreSection.cs
- ListenerPerfCounters.cs
- EnumValidator.cs
- TypedDataSetSchemaImporterExtension.cs
- DelegateSerializationHolder.cs
- SHA256Cng.cs
- MSHTMLHost.cs
- Exceptions.cs
- MouseEventArgs.cs
- StructuredType.cs
- bidPrivateBase.cs
- JsonUriDataContract.cs
- DirtyTextRange.cs
- CodeSubDirectory.cs
- TypeConverterHelper.cs
- FlowDecisionLabelFeature.cs
- ApplicationFileParser.cs
- VolatileEnlistmentState.cs
- XmlSchemaIdentityConstraint.cs
- ColorTransformHelper.cs
- AuthorizationPolicyTypeElement.cs
- ResXFileRef.cs
- WindowInteropHelper.cs
- CollectionContainer.cs
- FreezableOperations.cs
- HttpHandlersInstallComponent.cs
- DocumentPageHost.cs
- AmbiguousMatchException.cs
- DataGridViewColumnTypePicker.cs
- SchemaRegistration.cs
- ListViewItem.cs
- HWStack.cs
- safelinkcollection.cs
- ValidationError.cs
- LayoutSettings.cs
- ConfigurationValidatorBase.cs
- SqlFactory.cs
- OpCellTreeNode.cs
- CompilerTypeWithParams.cs
- PersistenceException.cs
- ProviderCommandInfoUtils.cs
- EventLogStatus.cs
- XmlDeclaration.cs
- StylusCaptureWithinProperty.cs
- SettingsAttributeDictionary.cs
- LogLogRecordEnumerator.cs
- FrameworkElement.cs
- TextWriterTraceListener.cs
- Quad.cs
- SecurityContextSecurityTokenParameters.cs
- ModulesEntry.cs
- DataTablePropertyDescriptor.cs
- FunctionImportMapping.ReturnTypeRenameMapping.cs
- SolidBrush.cs
- SelectedDatesCollection.cs
- AccessorTable.cs
- HtmlControlAdapter.cs
- ContentHostHelper.cs
- EraserBehavior.cs
- DesignTimeResourceProviderFactoryAttribute.cs
- DataFormats.cs
- LocatorPart.cs
- String.cs
- RsaSecurityTokenAuthenticator.cs
- TdsParserHelperClasses.cs
- ConstantProjectedSlot.cs
- RegionData.cs
- DecoderBestFitFallback.cs
- XmlDocumentSerializer.cs
- SQLChars.cs
- Internal.cs
- FileDetails.cs
- GradientBrush.cs
- _Rfc2616CacheValidators.cs
- TextEffect.cs
- LogSwitch.cs
- HiddenFieldPageStatePersister.cs
- MemberAccessException.cs