Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Hosting / DefaultWorkflowSchedulerService.cs / 1305376 / DefaultWorkflowSchedulerService.cs
using System; using System.Collections; using System.Collections.ObjectModel; using System.Collections.Generic; using System.Threading; using System.Collections.Specialized; using System.Diagnostics; using System.Workflow.Runtime; using System.Globalization; namespace System.Workflow.Runtime.Hosting { public class DefaultWorkflowSchedulerService : WorkflowSchedulerService { // next two fields controlled by locking the timerQueue private KeyedPriorityQueuetimerQueue = new KeyedPriorityQueue (); private Timer callbackTimer; private TimerCallback timerCallback; private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows"; private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5; private static TimeSpan infinite = new TimeSpan(Timeout.Infinite); private readonly int maxSimultaneousWorkflows; // Maximum number of work items allowed in ThreadPool queue private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0); // next three fields controlled by locking the waitingQueue private int numCurrentWorkers; private Queue waitingQueue; // Queue for extra items waiting to be allowed into thread pool private volatile bool running = false; private IList queueCounters; // expose internal queue length private static int DefaultThreadCount { get { return Environment.ProcessorCount == 1 ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8); } } public DefaultWorkflowSchedulerService() : this(DefaultThreadCount) { } public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows) : base() { if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); this.maxSimultaneousWorkflows = maxSimultaneousWorkflows; init(); } public DefaultWorkflowSchedulerService(NameValueCollection parameters) : base() { if (parameters == null) throw new ArgumentNullException("parameters"); maxSimultaneousWorkflows = DefaultThreadCount; foreach (string key in parameters.Keys) { if (key == null) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); string p = parameters[key]; if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows)) throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); } if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); init(); } private void init() { timerCallback = new TimerCallback(OnTimerCallback); timerQueue.FirstElementChanged += OnFirstElementChanged; waitingQueue = new Queue (); } public int MaxSimultaneousWorkflows { get { return maxSimultaneousWorkflows; } } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); if (callback == null) throw new ArgumentNullException("callback"); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); // Add the work item to our internal queue and signal the ProcessQueue thread EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) ); } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow)); if (callback == null) throw new ArgumentNullException("callback"); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc); lock (timerQueue) { timerQueue.Enqueue(timerId, ci, whenUtc); } } internal protected override void Cancel(Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId"); lock (timerQueue) { timerQueue.Remove(timerId); } } override protected void OnStarted() { lock (timerQueue) { base.OnStarted(); CallbackInfo ci = timerQueue.Peek(); if (ci != null) callbackTimer = CreateTimerCallback(ci); running = true; } lock (waitingQueue) { int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); for (int i = 0; i < nToStart; i++) { if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) { queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); } } protected internal override void Stop() { lock (timerQueue) { base.Stop(); if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } running = false; } lock (waitingQueue) { while (numCurrentWorkers > 0) { Monitor.Wait(waitingQueue); } } } private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) { // timerQueue must have been locked by operation that caused this event to fire if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) { callbackTimer = CreateTimerCallback(e.NewFirstElement); } } private void OnTimerCallback(object ignored) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; CallbackInfo ci = null; bool fire = false; try { lock (timerQueue) { if (State == WorkflowRuntimeServiceState.Started) { ci = timerQueue.Peek(); if (ci != null) { if (ci.IsExpired) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); timerQueue.Dequeue(); fire = true; } else { callbackTimer = CreateTimerCallback(ci); } } } } if(fire && ci != null) ci.Callback(ci.State); } // Ignore cases where the workflow has been stolen out from under us catch (WorkflowOwnershipException) { } catch (ThreadAbortException e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); throw; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); } } private Timer CreateTimerCallback(CallbackInfo info) { DateTime now = DateTime.UtcNow; TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero; if (span > fiveMinutes) // never let more than five minutes go by without checking span = fiveMinutes; return new Timer(timerCallback, info.State, span, infinite); } private void EnqueueWorkItem( WorkItem workItem ) { lock (waitingQueue) { waitingQueue.Enqueue(workItem); if (running && numCurrentWorkers < maxSimultaneousWorkflows) { if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } } private void QueueWorkerProcess(object state /*unused*/) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; while (true) { WorkItem workItem; lock (waitingQueue) { if (waitingQueue.Count == 0 || !running) { numCurrentWorkers--; Monitor.Pulse(waitingQueue); return; } workItem = waitingQueue.Dequeue(); } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } workItem.Invoke(this); } } internal class WorkItem { private WaitCallback callback; private object state; public WorkItem(WaitCallback callback, object state) { this.callback = callback; this.state = state; } public WaitCallback Callback { get { return callback; } } public void Invoke(WorkflowSchedulerService service) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); Callback(state); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { throw; } else { service.RaiseExceptionNotHandledEvent(e, (Guid)state); } } } } internal class CallbackInfo { WaitCallback callback; object state; DateTime when; WorkflowSchedulerService service; public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) { this.service = service; this.callback = callback; this.state = state; this.when = when; } public DateTime When { get { return when; } } public bool IsExpired { get { return DateTime.UtcNow >= when; } } public object State { get { return state; } } public WaitCallback Callback { get { return callback; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved. using System; using System.Collections; using System.Collections.ObjectModel; using System.Collections.Generic; using System.Threading; using System.Collections.Specialized; using System.Diagnostics; using System.Workflow.Runtime; using System.Globalization; namespace System.Workflow.Runtime.Hosting { public class DefaultWorkflowSchedulerService : WorkflowSchedulerService { // next two fields controlled by locking the timerQueue private KeyedPriorityQueue timerQueue = new KeyedPriorityQueue (); private Timer callbackTimer; private TimerCallback timerCallback; private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows"; private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5; private static TimeSpan infinite = new TimeSpan(Timeout.Infinite); private readonly int maxSimultaneousWorkflows; // Maximum number of work items allowed in ThreadPool queue private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0); // next three fields controlled by locking the waitingQueue private int numCurrentWorkers; private Queue waitingQueue; // Queue for extra items waiting to be allowed into thread pool private volatile bool running = false; private IList queueCounters; // expose internal queue length private static int DefaultThreadCount { get { return Environment.ProcessorCount == 1 ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8); } } public DefaultWorkflowSchedulerService() : this(DefaultThreadCount) { } public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows) : base() { if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); this.maxSimultaneousWorkflows = maxSimultaneousWorkflows; init(); } public DefaultWorkflowSchedulerService(NameValueCollection parameters) : base() { if (parameters == null) throw new ArgumentNullException("parameters"); maxSimultaneousWorkflows = DefaultThreadCount; foreach (string key in parameters.Keys) { if (key == null) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); string p = parameters[key]; if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows)) throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); } if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); init(); } private void init() { timerCallback = new TimerCallback(OnTimerCallback); timerQueue.FirstElementChanged += OnFirstElementChanged; waitingQueue = new Queue (); } public int MaxSimultaneousWorkflows { get { return maxSimultaneousWorkflows; } } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); if (callback == null) throw new ArgumentNullException("callback"); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); // Add the work item to our internal queue and signal the ProcessQueue thread EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) ); } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow)); if (callback == null) throw new ArgumentNullException("callback"); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc); lock (timerQueue) { timerQueue.Enqueue(timerId, ci, whenUtc); } } internal protected override void Cancel(Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId"); lock (timerQueue) { timerQueue.Remove(timerId); } } override protected void OnStarted() { lock (timerQueue) { base.OnStarted(); CallbackInfo ci = timerQueue.Peek(); if (ci != null) callbackTimer = CreateTimerCallback(ci); running = true; } lock (waitingQueue) { int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); for (int i = 0; i < nToStart; i++) { if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) { queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); } } protected internal override void Stop() { lock (timerQueue) { base.Stop(); if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } running = false; } lock (waitingQueue) { while (numCurrentWorkers > 0) { Monitor.Wait(waitingQueue); } } } private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) { // timerQueue must have been locked by operation that caused this event to fire if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) { callbackTimer = CreateTimerCallback(e.NewFirstElement); } } private void OnTimerCallback(object ignored) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; CallbackInfo ci = null; bool fire = false; try { lock (timerQueue) { if (State == WorkflowRuntimeServiceState.Started) { ci = timerQueue.Peek(); if (ci != null) { if (ci.IsExpired) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); timerQueue.Dequeue(); fire = true; } else { callbackTimer = CreateTimerCallback(ci); } } } } if(fire && ci != null) ci.Callback(ci.State); } // Ignore cases where the workflow has been stolen out from under us catch (WorkflowOwnershipException) { } catch (ThreadAbortException e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); throw; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); } } private Timer CreateTimerCallback(CallbackInfo info) { DateTime now = DateTime.UtcNow; TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero; if (span > fiveMinutes) // never let more than five minutes go by without checking span = fiveMinutes; return new Timer(timerCallback, info.State, span, infinite); } private void EnqueueWorkItem( WorkItem workItem ) { lock (waitingQueue) { waitingQueue.Enqueue(workItem); if (running && numCurrentWorkers < maxSimultaneousWorkflows) { if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } } private void QueueWorkerProcess(object state /*unused*/) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; while (true) { WorkItem workItem; lock (waitingQueue) { if (waitingQueue.Count == 0 || !running) { numCurrentWorkers--; Monitor.Pulse(waitingQueue); return; } workItem = waitingQueue.Dequeue(); } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } workItem.Invoke(this); } } internal class WorkItem { private WaitCallback callback; private object state; public WorkItem(WaitCallback callback, object state) { this.callback = callback; this.state = state; } public WaitCallback Callback { get { return callback; } } public void Invoke(WorkflowSchedulerService service) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); Callback(state); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { throw; } else { service.RaiseExceptionNotHandledEvent(e, (Guid)state); } } } } internal class CallbackInfo { WaitCallback callback; object state; DateTime when; WorkflowSchedulerService service; public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) { this.service = service; this.callback = callback; this.state = state; this.when = when; } public DateTime When { get { return when; } } public bool IsExpired { get { return DateTime.UtcNow >= when; } } public object State { get { return state; } } public WaitCallback Callback { get { return callback; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DataPagerField.cs
- DataColumnPropertyDescriptor.cs
- ValidationManager.cs
- DbConnectionOptions.cs
- SqlBuffer.cs
- EpmAttributeNameBuilder.cs
- SecurityHelper.cs
- DeferredElementTreeState.cs
- ImplicitInputBrush.cs
- RelationshipConstraintValidator.cs
- CodePropertyReferenceExpression.cs
- ConversionValidationRule.cs
- XPathNodeIterator.cs
- PersonalizationAdministration.cs
- ContextMenuService.cs
- OutputCacheProfileCollection.cs
- FillRuleValidation.cs
- FontFamilyIdentifier.cs
- FontConverter.cs
- DescendentsWalkerBase.cs
- SqlParameterizer.cs
- DataView.cs
- DCSafeHandle.cs
- PlanCompiler.cs
- SortDescription.cs
- PlanCompiler.cs
- ConfigViewGenerator.cs
- QfeChecker.cs
- ExplicitDiscriminatorMap.cs
- MediaPlayerState.cs
- SQLSingleStorage.cs
- FilePrompt.cs
- TypographyProperties.cs
- EUCJPEncoding.cs
- FrameworkContentElementAutomationPeer.cs
- ColumnResizeUndoUnit.cs
- XmlDataImplementation.cs
- RootDesignerSerializerAttribute.cs
- WizardStepBase.cs
- ImageListStreamer.cs
- ScriptDescriptor.cs
- SessionIDManager.cs
- Panel.cs
- ObjectStateEntryDbDataRecord.cs
- EntityDesignerDataSourceView.cs
- MemberNameValidator.cs
- Size3D.cs
- MailHeaderInfo.cs
- TextBox.cs
- BamlLocalizableResourceKey.cs
- RawStylusActions.cs
- StylusLogic.cs
- ToolStripProfessionalLowResolutionRenderer.cs
- __Error.cs
- DashStyle.cs
- XmlSchemaValidationException.cs
- DbConnectionHelper.cs
- ShaperBuffers.cs
- LogSwitch.cs
- ProtocolsSection.cs
- DbConnectionInternal.cs
- CalendarDay.cs
- ETagAttribute.cs
- NegatedCellConstant.cs
- DataRowComparer.cs
- SwitchLevelAttribute.cs
- EventDescriptor.cs
- RtfControlWordInfo.cs
- ProtectedConfigurationSection.cs
- RepeaterCommandEventArgs.cs
- XmlSerializerVersionAttribute.cs
- GenericsNotImplementedException.cs
- ResXResourceWriter.cs
- ListSortDescription.cs
- Freezable.cs
- FunctionCommandText.cs
- DesignerActionUIService.cs
- ToolCreatedEventArgs.cs
- HtmlShim.cs
- HtmlLink.cs
- MouseEvent.cs
- RIPEMD160Managed.cs
- NamespaceDecl.cs
- ReferenceSchema.cs
- ListBox.cs
- Pair.cs
- DynamicDocumentPaginator.cs
- SerialPort.cs
- DataGridHelper.cs
- TerminatorSinks.cs
- Types.cs
- ScrollData.cs
- TextMessageEncoder.cs
- StringValidatorAttribute.cs
- QuerySubExprEliminator.cs
- WorkflowPrinting.cs
- ColumnResizeAdorner.cs
- SchemaCompiler.cs
- Rect3DValueSerializer.cs
- ExpressionNormalizer.cs