Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Scheduler.cs / 1305376 / Scheduler.cs
using System; using System.Globalization; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Transactions; using System.Workflow.ComponentModel; namespace System.Workflow.Runtime { #region Scheduler // Only one instance of this type is used for a workflow instance. // class Scheduler { #region data // state to be persisted for the scheduler internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue), typeof(Scheduler)); internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue ), typeof(Scheduler)); Queue highPriorityEntriesQueue; Queue normalPriorityEntriesQueue; // non-persisted state for the scheduler WorkflowExecutor rootWorkflowExecutor; bool empty; bool canRun; bool threadRequested; bool abortOrTerminateRequested; Queue transactedEntries; object syncObject = new object(); #endregion data #region ctors // loading with some state public Scheduler(WorkflowExecutor rootExec, bool canRun) { this.rootWorkflowExecutor = rootExec; this.threadRequested = false; // canRun is true if normal creation // false if loading from a persisted state. Will be set to true later at ResumeOnIdle this.canRun = canRun; this.highPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty); this.normalPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty); if (this.highPriorityEntriesQueue == null) { this.highPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue); } if (this.normalPriorityEntriesQueue == null) { this.normalPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue); } this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0)); } #endregion ctors #region Misc properties public override string ToString() { return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')"; } protected WorkflowExecutor RootWorkflowExecutor { get { return this.rootWorkflowExecutor; } } public bool IsStalledNow { get { return empty; } } public bool CanRun { get { return canRun; } set { canRun = value; } } internal bool AbortOrTerminateRequested { get { return abortOrTerminateRequested; } set { abortOrTerminateRequested = value; } } #endregion Misc properties #region Run work public void Run() { do { this.RootWorkflowExecutor.ProcessQueuedEvents(); // Get item to run SchedulableItem item = GetItemToRun(); bool runningItem = false; // no ready work to run... go away if (item == null) break; Activity itemActivity = null; Exception exp = null; TransactionalProperties transactionalProperties = null; int contextId = item.ContextId; // This function gets the root or enclosing while-loop activity Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId); if (contextActivity == null) throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext); // This is the activity corresponding to the item's ActivityId itemActivity = contextActivity.GetActivityByName(item.ActivityId); using (new ServiceEnvironment(itemActivity)) { exp = null; bool ignoreFinallyBlock = false; try { // item preamble // set up the item transactional context if necessary // Debug.Assert(itemActivity != null, "null itemActivity"); if (itemActivity == null) throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId)); Activity atomicActivity = null; if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity)) { transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty); // If we've aborted for any reason stop now! // If we attempt to enter a new TransactionScope the com+ context will get corrupted // See windows se bug 137267 if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties)) { if (transactionalProperties.TransactionScope == null) { // Use TimeSpan.Zero so scope will not create timeout independent of the transaction // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+ transactionalProperties.TransactionScope = new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full); WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString + "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name); } } } // Run the item // runningItem = true; WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // running any entry implicitly changes some state of the workflow instance this.RootWorkflowExecutor.stateChangedSincePersistence = true; item.Run(this.RootWorkflowExecutor); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { ignoreFinallyBlock = true; throw; } else { if (transactionalProperties != null) transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed; exp = e; } } finally { if (!ignoreFinallyBlock) { if (runningItem) WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // Process exception // if (exp != null) { // this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null); exp = null; } } } } } while (true); } private SchedulableItem GetItemToRun() { SchedulableItem ret = null; lock (this.syncObject) { bool workToDo = false; if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0)) { workToDo = true; // If an abort or termination of the workflow has been requested, // then the workflow should try to terminate ASAP. Even transaction scopes // in progress shouldn't be executed to completion. (Ref: 16534) if (this.AbortOrTerminateRequested) { ret = null; } // got work to do in the scheduler else if ((this.highPriorityEntriesQueue.Count > 0)) { ret = this.highPriorityEntriesQueue.Dequeue(); } else if (this.CanRun) { // the scheduler can run right now // // pick an entry to run // if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null && (this.normalPriorityEntriesQueue.Count > 0)) ret = this.normalPriorityEntriesQueue.Dequeue(); } else { // scheduler can't run right now.. even though there is ready work // do nothing in the scheduler ret = null; } } if (!workToDo) { // no ready work to do in the scheduler... // we are gonna return the thread back this.empty = true; } // set it to true only iff there is something to run this.threadRequested = (ret != null); } return ret; } // This method should be called only after we have determined that // this instance can start running now public void Resume() { canRun = true; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } // This method should be called only after we have determined that // this instance can start running now public void ResumeIfRunnable() { if (!canRun) return; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } #endregion Run work #region Schedule work public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted) { lock (this.syncObject) { WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString()); // SchedulableItems in AtomicTransaction has higher priority Queue q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue; q.Enqueue(s); if (transacted) { if (transactedEntries == null) transactedEntries = new Queue (); transactedEntries.Enqueue(s); } if (!this.threadRequested) { if (this.CanRun) { this.RootWorkflowExecutor.ScheduleForWork(); this.threadRequested = true; } } this.empty = false; } } #endregion Schedule work #region psuedo-transacted support public void PostPersist() { transactedEntries = null; } public void Rollback() { if (transactedEntries != null && transactedEntries.Count > 0) { // make a list of non-transacted entries // @undone: bmalhi: transacted entries only on priority-0 IEnumerator e = this.normalPriorityEntriesQueue.GetEnumerator(); Queue newScheduled = new Queue (); while (e.MoveNext()) { if (!transactedEntries.Contains(e.Current)) newScheduled.Enqueue(e.Current); } // clear the scheduled items this.normalPriorityEntriesQueue.Clear(); // schedule the non-transacted items back e = newScheduled.GetEnumerator(); while (e.MoveNext()) this.normalPriorityEntriesQueue.Enqueue(e.Current); transactedEntries = null; } } #endregion psuedo-transacted support } #endregion Scheduler } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- EtwTrackingParticipant.cs
- MultipleViewPattern.cs
- CodeTypeReferenceCollection.cs
- DateTimeSerializationSection.cs
- SettingsPropertyCollection.cs
- DbConnectionHelper.cs
- BaseParser.cs
- BaseCollection.cs
- TabControlEvent.cs
- RegexMatchCollection.cs
- ColumnResult.cs
- GeometryDrawing.cs
- StaticDataManager.cs
- IdentifierService.cs
- XamlSerializerUtil.cs
- DerivedKeyCachingSecurityTokenSerializer.cs
- Grammar.cs
- LogExtentCollection.cs
- Bidi.cs
- SqlOuterApplyReducer.cs
- Quaternion.cs
- Variable.cs
- CorrelationTokenInvalidatedHandler.cs
- SmiEventStream.cs
- webbrowsersite.cs
- ModifierKeysConverter.cs
- ImportedNamespaceContextItem.cs
- ToolboxControl.cs
- DateBoldEvent.cs
- __ComObject.cs
- DateTimeConstantAttribute.cs
- LinkedResourceCollection.cs
- ComboBox.cs
- BindingOperations.cs
- FormClosingEvent.cs
- HostProtectionException.cs
- QilName.cs
- Error.cs
- coordinator.cs
- ClearCollection.cs
- EntityProviderServices.cs
- ToolboxItem.cs
- DateTimeFormatInfoScanner.cs
- PathParser.cs
- SchemaMapping.cs
- OptimisticConcurrencyException.cs
- ObjectKeyFrameCollection.cs
- MenuBindingsEditor.cs
- PermissionRequestEvidence.cs
- HitTestParameters3D.cs
- PropertiesTab.cs
- BinaryMessageEncoder.cs
- BrowserTree.cs
- FixedSOMPageElement.cs
- ContentFilePart.cs
- ReadWriteObjectLock.cs
- Column.cs
- DataGridPageChangedEventArgs.cs
- XamlToRtfWriter.cs
- MeasureData.cs
- ViewBase.cs
- StartFileNameEditor.cs
- WebPartExportVerb.cs
- CalendarSelectionChangedEventArgs.cs
- SystemTcpStatistics.cs
- safex509handles.cs
- DecimalAnimation.cs
- LineProperties.cs
- XmlNamespaceMapping.cs
- UpdatableWrapper.cs
- ConstNode.cs
- IDReferencePropertyAttribute.cs
- XamlSerializationHelper.cs
- ExpandoClass.cs
- ISO2022Encoding.cs
- StrokeRenderer.cs
- ObjectPropertyMapping.cs
- RegistryConfigurationProvider.cs
- DataKey.cs
- HtmlEncodedRawTextWriter.cs
- Registry.cs
- DetailsViewUpdateEventArgs.cs
- RTLAwareMessageBox.cs
- EndpointDiscoveryElement.cs
- PagesSection.cs
- TemplateBamlRecordReader.cs
- BrowserTree.cs
- WinInet.cs
- EncryptedReference.cs
- ImageList.cs
- ContextProperty.cs
- DataPagerCommandEventArgs.cs
- FileIOPermission.cs
- InstanceLockQueryResult.cs
- HtmlObjectListAdapter.cs
- StoreItemCollection.cs
- TreeViewImageKeyConverter.cs
- EmbeddedMailObjectsCollection.cs
- AssemblyName.cs
- FormsAuthenticationConfiguration.cs