Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Scheduler.cs / 1305376 / Scheduler.cs
using System; using System.Globalization; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Transactions; using System.Workflow.ComponentModel; namespace System.Workflow.Runtime { #region Scheduler // Only one instance of this type is used for a workflow instance. // class Scheduler { #region data // state to be persisted for the scheduler internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue), typeof(Scheduler)); internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue ), typeof(Scheduler)); Queue highPriorityEntriesQueue; Queue normalPriorityEntriesQueue; // non-persisted state for the scheduler WorkflowExecutor rootWorkflowExecutor; bool empty; bool canRun; bool threadRequested; bool abortOrTerminateRequested; Queue transactedEntries; object syncObject = new object(); #endregion data #region ctors // loading with some state public Scheduler(WorkflowExecutor rootExec, bool canRun) { this.rootWorkflowExecutor = rootExec; this.threadRequested = false; // canRun is true if normal creation // false if loading from a persisted state. Will be set to true later at ResumeOnIdle this.canRun = canRun; this.highPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty); this.normalPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty); if (this.highPriorityEntriesQueue == null) { this.highPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue); } if (this.normalPriorityEntriesQueue == null) { this.normalPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue); } this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0)); } #endregion ctors #region Misc properties public override string ToString() { return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')"; } protected WorkflowExecutor RootWorkflowExecutor { get { return this.rootWorkflowExecutor; } } public bool IsStalledNow { get { return empty; } } public bool CanRun { get { return canRun; } set { canRun = value; } } internal bool AbortOrTerminateRequested { get { return abortOrTerminateRequested; } set { abortOrTerminateRequested = value; } } #endregion Misc properties #region Run work public void Run() { do { this.RootWorkflowExecutor.ProcessQueuedEvents(); // Get item to run SchedulableItem item = GetItemToRun(); bool runningItem = false; // no ready work to run... go away if (item == null) break; Activity itemActivity = null; Exception exp = null; TransactionalProperties transactionalProperties = null; int contextId = item.ContextId; // This function gets the root or enclosing while-loop activity Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId); if (contextActivity == null) throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext); // This is the activity corresponding to the item's ActivityId itemActivity = contextActivity.GetActivityByName(item.ActivityId); using (new ServiceEnvironment(itemActivity)) { exp = null; bool ignoreFinallyBlock = false; try { // item preamble // set up the item transactional context if necessary // Debug.Assert(itemActivity != null, "null itemActivity"); if (itemActivity == null) throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId)); Activity atomicActivity = null; if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity)) { transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty); // If we've aborted for any reason stop now! // If we attempt to enter a new TransactionScope the com+ context will get corrupted // See windows se bug 137267 if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties)) { if (transactionalProperties.TransactionScope == null) { // Use TimeSpan.Zero so scope will not create timeout independent of the transaction // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+ transactionalProperties.TransactionScope = new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full); WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString + "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name); } } } // Run the item // runningItem = true; WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // running any entry implicitly changes some state of the workflow instance this.RootWorkflowExecutor.stateChangedSincePersistence = true; item.Run(this.RootWorkflowExecutor); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { ignoreFinallyBlock = true; throw; } else { if (transactionalProperties != null) transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed; exp = e; } } finally { if (!ignoreFinallyBlock) { if (runningItem) WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // Process exception // if (exp != null) { // this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null); exp = null; } } } } } while (true); } private SchedulableItem GetItemToRun() { SchedulableItem ret = null; lock (this.syncObject) { bool workToDo = false; if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0)) { workToDo = true; // If an abort or termination of the workflow has been requested, // then the workflow should try to terminate ASAP. Even transaction scopes // in progress shouldn't be executed to completion. (Ref: 16534) if (this.AbortOrTerminateRequested) { ret = null; } // got work to do in the scheduler else if ((this.highPriorityEntriesQueue.Count > 0)) { ret = this.highPriorityEntriesQueue.Dequeue(); } else if (this.CanRun) { // the scheduler can run right now // // pick an entry to run // if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null && (this.normalPriorityEntriesQueue.Count > 0)) ret = this.normalPriorityEntriesQueue.Dequeue(); } else { // scheduler can't run right now.. even though there is ready work // do nothing in the scheduler ret = null; } } if (!workToDo) { // no ready work to do in the scheduler... // we are gonna return the thread back this.empty = true; } // set it to true only iff there is something to run this.threadRequested = (ret != null); } return ret; } // This method should be called only after we have determined that // this instance can start running now public void Resume() { canRun = true; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } // This method should be called only after we have determined that // this instance can start running now public void ResumeIfRunnable() { if (!canRun) return; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } #endregion Run work #region Schedule work public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted) { lock (this.syncObject) { WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString()); // SchedulableItems in AtomicTransaction has higher priority Queue q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue; q.Enqueue(s); if (transacted) { if (transactedEntries == null) transactedEntries = new Queue (); transactedEntries.Enqueue(s); } if (!this.threadRequested) { if (this.CanRun) { this.RootWorkflowExecutor.ScheduleForWork(); this.threadRequested = true; } } this.empty = false; } } #endregion Schedule work #region psuedo-transacted support public void PostPersist() { transactedEntries = null; } public void Rollback() { if (transactedEntries != null && transactedEntries.Count > 0) { // make a list of non-transacted entries // @undone: bmalhi: transacted entries only on priority-0 IEnumerator e = this.normalPriorityEntriesQueue.GetEnumerator(); Queue newScheduled = new Queue (); while (e.MoveNext()) { if (!transactedEntries.Contains(e.Current)) newScheduled.Enqueue(e.Current); } // clear the scheduled items this.normalPriorityEntriesQueue.Clear(); // schedule the non-transacted items back e = newScheduled.GetEnumerator(); while (e.MoveNext()) this.normalPriorityEntriesQueue.Enqueue(e.Current); transactedEntries = null; } } #endregion psuedo-transacted support } #endregion Scheduler } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- InstanceLockedException.cs
- TextEndOfLine.cs
- SearchForVirtualItemEventArgs.cs
- MdbDataFileEditor.cs
- EnumerationRangeValidationUtil.cs
- BaseTreeIterator.cs
- SqlTypesSchemaImporter.cs
- StreamWriter.cs
- ConditionalExpression.cs
- RootProjectionNode.cs
- XsdDuration.cs
- DrawingGroupDrawingContext.cs
- AutoCompleteStringCollection.cs
- DeclaredTypeValidator.cs
- RemoteX509Token.cs
- Connection.cs
- NullReferenceException.cs
- StackOverflowException.cs
- UniqueSet.cs
- CompositeScriptReferenceEventArgs.cs
- FastPropertyAccessor.cs
- DashStyles.cs
- GZipDecoder.cs
- SmtpFailedRecipientException.cs
- DispatcherProcessingDisabled.cs
- UpdateRecord.cs
- SponsorHelper.cs
- ViewKeyConstraint.cs
- EntityStoreSchemaFilterEntry.cs
- UnsafeNativeMethods.cs
- EditorPart.cs
- ProviderConnectionPoint.cs
- CodeDOMUtility.cs
- NativeCompoundFileAPIs.cs
- SyndicationLink.cs
- PixelFormat.cs
- WindowsStatic.cs
- Configuration.cs
- SmtpNetworkElement.cs
- Highlights.cs
- SupportingTokenSecurityTokenResolver.cs
- PropertyGridEditorPart.cs
- ErrorTolerantObjectWriter.cs
- LocalizabilityAttribute.cs
- Propagator.Evaluator.cs
- TimeSpanSecondsConverter.cs
- ProviderConnectionPointCollection.cs
- TabRenderer.cs
- TextBox.cs
- SerializerWriterEventHandlers.cs
- StaticFileHandler.cs
- NamespaceMapping.cs
- PropertyChangeTracker.cs
- ReferencedCollectionType.cs
- Panel.cs
- DockPattern.cs
- EncoderReplacementFallback.cs
- ObjectAnimationBase.cs
- BufferedGraphicsContext.cs
- ColumnResizeUndoUnit.cs
- FamilyMapCollection.cs
- DateTimeOffsetStorage.cs
- Select.cs
- FixedSOMSemanticBox.cs
- Encoder.cs
- AtomMaterializer.cs
- DataGridParentRows.cs
- ListArgumentProvider.cs
- XmlHelper.cs
- Vars.cs
- XPathMultyIterator.cs
- BrushConverter.cs
- Vector3DIndependentAnimationStorage.cs
- TraceLog.cs
- SqlCacheDependency.cs
- SourceElementsCollection.cs
- QilTargetType.cs
- AutoResetEvent.cs
- MembershipAdapter.cs
- OleDbDataAdapter.cs
- ClientApiGenerator.cs
- CommentEmitter.cs
- UIAgentMonitor.cs
- PerformanceCountersElement.cs
- WindowsFormsSynchronizationContext.cs
- ObjectDataSourceFilteringEventArgs.cs
- DataRelationCollection.cs
- DiagnosticsConfigurationHandler.cs
- ChooseAction.cs
- XsltLibrary.cs
- SharedUtils.cs
- SchemaDeclBase.cs
- Style.cs
- BuildProviderUtils.cs
- RegexWriter.cs
- XsdBuildProvider.cs
- ReachUIElementCollectionSerializer.cs
- WebBrowserSiteBase.cs
- RbTree.cs
- SecureUICommand.cs