Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / ForAllOperator.cs / 1305376 / ForAllOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ForAllQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A forall operator just enables an action to be placed at the "top" of a query tree /// instead of yielding an enumerator that some consumer can walk. We execute the /// query for effect instead of yielding a data result. /// ///internal sealed class ForAllOperator : UnaryQueryOperator { // The per-element action to be invoked. private readonly Action m_elementAction; //---------------------------------------------------------------------------------------- // Constructs a new forall operator. // internal ForAllOperator(IEnumerable child, Action elementAction) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); Contract.Assert(elementAction != null, "need a function"); m_elementAction = elementAction; } //--------------------------------------------------------------------------------------- // This invokes the entire query tree, invoking the per-element action for each result. // internal void RunSynchronously() { Contract.Assert(m_elementAction != null); // Get the enumerator w/out using pipelining. By the time this returns, the query // has been executed and we are done. We expect the return to be null. Shared dummyTopLevelDisposeFlag = new Shared (false); CancellationTokenSource dummyInternalCancellationTokenSource = new CancellationTokenSource(); // stuff in appropriate defaults for unspecified options. QuerySettings settingsWithDefaults = SpecifiedQuerySettings .WithPerExecutionSettings(dummyInternalCancellationTokenSource, dummyTopLevelDisposeFlag) .WithDefaults(); QueryLifecycle.LogicalQueryExecutionBegin(settingsWithDefaults.QueryId); IEnumerator enumerator = GetOpenedEnumerator(ParallelMergeOptions.FullyBuffered, true, true, settingsWithDefaults); settingsWithDefaults.CleanStateAtQueryEnd(); Contract.Assert(enumerator == null); QueryLifecycle.LogicalQueryExecutionEnd(settingsWithDefaults.QueryId); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open the child operator. QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Correct); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new ForAllEnumerator ( inputStream[i], m_elementAction, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { Contract.Assert(false, "AsSequentialQuery is not supported on ForAllOperator"); throw new InvalidOperationException(); } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //--------------------------------------------------------------------------------------- // The executable form of a forall operator. When it is enumerated, the entire underlying // partition is walked, invoking the per-element action for each item. // private class ForAllEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The data source. private readonly Action m_elementAction; // Forall operator being executed. private CancellationToken m_cancellationToken; // Token used to cancel this operator. //---------------------------------------------------------------------------------------- // Constructs a new forall enumerator object. // internal ForAllEnumerator(QueryOperatorEnumerator source, Action elementAction, CancellationToken cancellationToken) { Contract.Assert(source != null); Contract.Assert(elementAction != null); m_source = source; m_elementAction = elementAction; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Just walks the entire data source upon its first invocation, performing the per- // element action for each element. // internal override bool MoveNext(ref TInput currentElement, ref int currentKey) { Contract.Assert(m_elementAction != null, "expected a compiled operator"); // We just scroll through the enumerator and execute the action. Because we execute // "in place", we actually never even produce a single value. // Cancellation testing must be performed here as full enumeration occurs within this method. // We only need to throw a simple exception here.. marshalling logic handled via QueryTaskGroupState.QueryEnd (called by ForAllSpoolingTask) TInput element = default(TInput); TKey keyUnused = default(TKey); int i = 0; while (m_source.MoveNext(ref element, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); m_elementAction(element); } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ForAllQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A forall operator just enables an action to be placed at the "top" of a query tree /// instead of yielding an enumerator that some consumer can walk. We execute the /// query for effect instead of yielding a data result. /// ///internal sealed class ForAllOperator : UnaryQueryOperator { // The per-element action to be invoked. private readonly Action m_elementAction; //---------------------------------------------------------------------------------------- // Constructs a new forall operator. // internal ForAllOperator(IEnumerable child, Action elementAction) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); Contract.Assert(elementAction != null, "need a function"); m_elementAction = elementAction; } //--------------------------------------------------------------------------------------- // This invokes the entire query tree, invoking the per-element action for each result. // internal void RunSynchronously() { Contract.Assert(m_elementAction != null); // Get the enumerator w/out using pipelining. By the time this returns, the query // has been executed and we are done. We expect the return to be null. Shared dummyTopLevelDisposeFlag = new Shared (false); CancellationTokenSource dummyInternalCancellationTokenSource = new CancellationTokenSource(); // stuff in appropriate defaults for unspecified options. QuerySettings settingsWithDefaults = SpecifiedQuerySettings .WithPerExecutionSettings(dummyInternalCancellationTokenSource, dummyTopLevelDisposeFlag) .WithDefaults(); QueryLifecycle.LogicalQueryExecutionBegin(settingsWithDefaults.QueryId); IEnumerator enumerator = GetOpenedEnumerator(ParallelMergeOptions.FullyBuffered, true, true, settingsWithDefaults); settingsWithDefaults.CleanStateAtQueryEnd(); Contract.Assert(enumerator == null); QueryLifecycle.LogicalQueryExecutionEnd(settingsWithDefaults.QueryId); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open the child operator. QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Correct); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new ForAllEnumerator ( inputStream[i], m_elementAction, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { Contract.Assert(false, "AsSequentialQuery is not supported on ForAllOperator"); throw new InvalidOperationException(); } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //--------------------------------------------------------------------------------------- // The executable form of a forall operator. When it is enumerated, the entire underlying // partition is walked, invoking the per-element action for each item. // private class ForAllEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The data source. private readonly Action m_elementAction; // Forall operator being executed. private CancellationToken m_cancellationToken; // Token used to cancel this operator. //---------------------------------------------------------------------------------------- // Constructs a new forall enumerator object. // internal ForAllEnumerator(QueryOperatorEnumerator source, Action elementAction, CancellationToken cancellationToken) { Contract.Assert(source != null); Contract.Assert(elementAction != null); m_source = source; m_elementAction = elementAction; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Just walks the entire data source upon its first invocation, performing the per- // element action for each element. // internal override bool MoveNext(ref TInput currentElement, ref int currentKey) { Contract.Assert(m_elementAction != null, "expected a compiled operator"); // We just scroll through the enumerator and execute the action. Because we execute // "in place", we actually never even produce a single value. // Cancellation testing must be performed here as full enumeration occurs within this method. // We only need to throw a simple exception here.. marshalling logic handled via QueryTaskGroupState.QueryEnd (called by ForAllSpoolingTask) TInput element = default(TInput); TKey keyUnused = default(TKey); int i = 0; while (m_source.MoveNext(ref element, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); m_elementAction(element); } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DataConnectionHelper.cs
- ByteViewer.cs
- PartialArray.cs
- _ContextAwareResult.cs
- WorkflowInstanceProvider.cs
- RewritingPass.cs
- WarningException.cs
- documentsequencetextpointer.cs
- nulltextcontainer.cs
- Brush.cs
- ProfessionalColorTable.cs
- SqlCachedBuffer.cs
- EventBookmark.cs
- Collection.cs
- WsatEtwTraceListener.cs
- DataViewManagerListItemTypeDescriptor.cs
- SynchronousChannelMergeEnumerator.cs
- SelectedDatesCollection.cs
- TextBoxAutomationPeer.cs
- StandardOleMarshalObject.cs
- HwndHost.cs
- PathSegment.cs
- KeyboardNavigation.cs
- SchemaNotation.cs
- MultiView.cs
- EntityCodeGenerator.cs
- HtmlTable.cs
- SHA512Managed.cs
- ToolStripHighContrastRenderer.cs
- RijndaelManaged.cs
- PeerTransportSecuritySettings.cs
- FileDialogCustomPlacesCollection.cs
- TrustManager.cs
- URLMembershipCondition.cs
- ObjectQueryProvider.cs
- DesignerForm.cs
- APCustomTypeDescriptor.cs
- ObjectDataSourceDisposingEventArgs.cs
- ImageConverter.cs
- QueryPrefixOp.cs
- SQLGuid.cs
- Console.cs
- IPPacketInformation.cs
- Bidi.cs
- DateTimeFormat.cs
- BitmapPalettes.cs
- TextElementEnumerator.cs
- EventDescriptorCollection.cs
- BufferedStream.cs
- DiscoveryClientElement.cs
- ExceptionRoutedEventArgs.cs
- NodeFunctions.cs
- MobileControl.cs
- EnglishPluralizationService.cs
- StreamReader.cs
- SqlNotificationRequest.cs
- newitemfactory.cs
- ValueType.cs
- RSAPKCS1KeyExchangeFormatter.cs
- UnsafeNativeMethods.cs
- RemotingSurrogateSelector.cs
- MultiDataTrigger.cs
- ProviderIncompatibleException.cs
- InstanceDescriptor.cs
- RewritingProcessor.cs
- SplitterEvent.cs
- DataServiceHost.cs
- UrlPropertyAttribute.cs
- DbConnectionPool.cs
- ProfileSection.cs
- DataQuery.cs
- Int32EqualityComparer.cs
- ScriptRef.cs
- DataSourceProvider.cs
- StatusBarPanel.cs
- RegistryKey.cs
- CodeAttributeDeclarationCollection.cs
- HtmlWindowCollection.cs
- SafeBitVector32.cs
- InputReferenceExpression.cs
- ProfilePropertySettingsCollection.cs
- OverflowException.cs
- RangeContentEnumerator.cs
- ObjectStateEntryOriginalDbUpdatableDataRecord.cs
- ListViewDeleteEventArgs.cs
- ScriptControl.cs
- TransactionChannel.cs
- InvalidPropValue.cs
- SchemaNotation.cs
- TreeView.cs
- UnicodeEncoding.cs
- ListViewItemMouseHoverEvent.cs
- EntityAdapter.cs
- UDPClient.cs
- NegotiateStream.cs
- ping.cs
- userdatakeys.cs
- SqlNode.cs
- WSHttpBindingCollectionElement.cs
- ThemeInfoAttribute.cs