DefaultWorkflowSchedulerService.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Hosting / DefaultWorkflowSchedulerService.cs / 1305376 / DefaultWorkflowSchedulerService.cs

                            using System; 
using System.Collections;
using System.Collections.ObjectModel;
using System.Collections.Generic;
using System.Threading; 
using System.Collections.Specialized;
using System.Diagnostics; 
using System.Workflow.Runtime; 
using System.Globalization;
 
namespace System.Workflow.Runtime.Hosting
{
    public class DefaultWorkflowSchedulerService : WorkflowSchedulerService
    { 
        // next two fields controlled by locking the timerQueue
        private KeyedPriorityQueue timerQueue = new KeyedPriorityQueue(); 
        private Timer callbackTimer; 

        private TimerCallback timerCallback; 
        private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows";
        private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5;
        private static TimeSpan infinite = new TimeSpan(Timeout.Infinite);
        private readonly int maxSimultaneousWorkflows;       // Maximum number of work items allowed in ThreadPool queue 
        private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0);
 
        // next three fields controlled by locking the waitingQueue 
        private int numCurrentWorkers;
        private Queue waitingQueue;       // Queue for extra items waiting to be allowed into thread pool 
        private volatile bool running = false;

        private IList queueCounters;    // expose internal queue length
 
        private static int DefaultThreadCount
        { 
            get 
            {
                return Environment.ProcessorCount == 1 
                    ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS
                    : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8);
            }
        } 

        public DefaultWorkflowSchedulerService() 
            : this(DefaultThreadCount) 
        {
        } 


        public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows)
            : base() 
        {
            if (maxSimultaneousWorkflows < 1) 
                throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); 
            this.maxSimultaneousWorkflows = maxSimultaneousWorkflows;
            init(); 
        }

        public DefaultWorkflowSchedulerService(NameValueCollection parameters)
            : base() 
        {
            if (parameters == null) 
                throw new ArgumentNullException("parameters"); 

            maxSimultaneousWorkflows = DefaultThreadCount; 
            foreach (string key in parameters.Keys)
            {
                if (key == null)
                    throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); 
                string p = parameters[key];
                if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) 
                    throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); 
                if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows))
                    throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); 
            }

            if (maxSimultaneousWorkflows < 1)
                throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); 

            init(); 
        } 

        private void init() 
        {
            timerCallback = new TimerCallback(OnTimerCallback);
            timerQueue.FirstElementChanged += OnFirstElementChanged;
            waitingQueue = new Queue(); 
        }
 
 
        public int MaxSimultaneousWorkflows
        { 
            get { return maxSimultaneousWorkflows; }
        }

        internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) 
        {
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); 
 
            if (callback == null)
                throw new ArgumentNullException("callback"); 
            if (workflowInstanceId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId"));

            // Add the work item to our internal queue and signal the ProcessQueue thread 
            EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) );
        } 
 
        internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId)
        { 
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow));

            if (callback == null)
                throw new ArgumentNullException("callback"); 
            if (timerId == Guid.Empty)
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); 
            if (workflowInstanceId == Guid.Empty) 
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId"));
 
            CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc);

            lock (timerQueue)
            { 
                timerQueue.Enqueue(timerId, ci, whenUtc);
            } 
        } 

        internal protected override void Cancel(Guid timerId) 
        {
            WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId);

            if (timerId == Guid.Empty) 
                throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId");
 
            lock (timerQueue) 
            {
                timerQueue.Remove(timerId); 
            }
        }

        override protected void OnStarted() 
        {
            lock (timerQueue) 
            { 
                base.OnStarted();
                CallbackInfo ci = timerQueue.Peek(); 
                if (ci != null)
                    callbackTimer = CreateTimerCallback(ci);
                running = true;
            } 
            lock (waitingQueue)
            { 
                int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); 
                for (int i = 0; i < nToStart; i++)
                { 
                    if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess))
                    {
                        numCurrentWorkers++;
                    } 
                }
            } 
            if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) 
            {
                queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); 
            }
        }

        protected internal override void Stop() 
        {
            lock (timerQueue) 
            { 
                base.Stop();
                if (callbackTimer != null) 
                {
                    callbackTimer.Dispose();
                    callbackTimer = null;
                } 
                running = false;
            } 
            lock (waitingQueue) 
            {
                while (numCurrentWorkers > 0) 
                {
                    Monitor.Wait(waitingQueue);
                }
            } 
        }
 
        private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) 
        {
            // timerQueue must have been locked by operation that caused this event to fire 

            if (callbackTimer != null)
            {
                callbackTimer.Dispose(); 
                callbackTimer = null;
            } 
            if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) 
            {
                callbackTimer = CreateTimerCallback(e.NewFirstElement); 
            }
        }

        private void OnTimerCallback(object ignored) 
        {
            //Make sure activity ID comes out of Threadpool are initialized to null. 
            Trace.CorrelationManager.ActivityId = Guid.Empty; 

            CallbackInfo ci = null; 
            bool fire = false;
            try
            {
                lock (timerQueue) 
                {
                    if (State == WorkflowRuntimeServiceState.Started) 
                    { 
                        ci = timerQueue.Peek();
                        if (ci != null) 
                        {
                            if (ci.IsExpired)
                            {
                                WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); 
                                timerQueue.Dequeue();
                                fire = true; 
                            } 
                            else
                            { 
                                callbackTimer = CreateTimerCallback(ci);
                            }
                        }
                    } 
                }
                if(fire && ci != null) 
                    ci.Callback(ci.State); 
            }
            // Ignore cases where the workflow has been stolen out from under us 
            catch (WorkflowOwnershipException)
            { }
            catch (ThreadAbortException e)
            { 
                WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message);
                RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); 
                throw; 
            }
            catch (Exception e) 
            {
                WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message);
                RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State);
            } 
        }
 
        private Timer CreateTimerCallback(CallbackInfo info) 
        {
            DateTime now = DateTime.UtcNow; 
            TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero;
            if (span > fiveMinutes) // never let more than five minutes go by without checking
                span = fiveMinutes;
            return new Timer(timerCallback, info.State, span, infinite); 
        }
 
        private void EnqueueWorkItem( WorkItem workItem ) 
        {
            lock (waitingQueue) 
            {
                waitingQueue.Enqueue(workItem);
                if (running && numCurrentWorkers < maxSimultaneousWorkflows)
                { 
                    if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess))
                    { 
                        numCurrentWorkers++; 
                    }
                } 
            }
            if (queueCounters != null)
            {
                foreach (PerformanceCounter p in queueCounters) 
                {
                    p.RawValue = waitingQueue.Count; 
                } 
            }
        } 

        private void QueueWorkerProcess(object state /*unused*/)
        {
            //Make sure activity ID comes out of Threadpool are initialized to null. 
            Trace.CorrelationManager.ActivityId = Guid.Empty;
 
            while (true) 
            {
                WorkItem workItem; 
                lock (waitingQueue)
                {
                    if (waitingQueue.Count == 0 || !running)
                    { 
                        numCurrentWorkers--;
                        Monitor.Pulse(waitingQueue); 
                        return; 
                    }
                    workItem = waitingQueue.Dequeue(); 
                }
                if (queueCounters != null)
                {
                    foreach (PerformanceCounter p in queueCounters) 
                    {
                        p.RawValue = waitingQueue.Count; 
                    } 
                }
                workItem.Invoke(this); 
            }
        }

 
        internal class WorkItem
        { 
            private WaitCallback callback; 
            private object state;
 
            public WorkItem(WaitCallback callback, object state)
            {
                this.callback = callback;
                this.state = state; 
            }
 
            public WaitCallback Callback 
            {
                get { return callback; } 
            }

            public void Invoke(WorkflowSchedulerService service)
            { 
                try
                { 
                    WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); 
                    Callback(state);
                } 
                catch (Exception e)
                {
                    if (WorkflowExecutor.IsIrrecoverableException(e))
                    { 
                        throw;
                    } 
                    else 
                    {
                        service.RaiseExceptionNotHandledEvent(e, (Guid)state); 
                    }
                }
            }
        } 

        internal class CallbackInfo 
        { 
            WaitCallback callback;
            object state; 
            DateTime when;
            WorkflowSchedulerService service;

            public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) 
            {
                this.service = service; 
                this.callback = callback; 
                this.state = state;
                this.when = when; 
            }

            public DateTime When
            { 
                get { return when; }
            } 
 
            public bool IsExpired
            { 
                get { return DateTime.UtcNow >= when; }
            }

            public object State 
            {
                get { return state; } 
            } 

            public WaitCallback Callback 
            {
                get { return callback; }
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK