TaskScheduler.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Threading / Tasks / TaskScheduler.cs / 1305376 / TaskScheduler.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// TaskScheduler.cs 
//
// [....] 
//
// This file contains the primary interface and management of tasks and queues.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 

using System; 
using System.Collections.Generic; 
using System.Globalization;
using System.Threading; 
using System.Security;
using System.Security.Permissions;
using System.Collections.Concurrent;
using System.Diagnostics.Contracts; 
using System.Diagnostics;
using System.Runtime.CompilerServices; 
 
namespace System.Threading.Tasks
{ 

    /// 
    /// Represents an abstract scheduler for tasks.
    ///  
    /// 
    ///  
    /// TaskScheduler acts as the extension point for all 
    /// pluggable scheduling logic.  This includes mechanisms such as how to schedule a task for execution, and
    /// how scheduled tasks should be exposed to debuggers. 
    /// 
    /// 
    /// All members of the abstract  type are thread-safe
    /// and may be used from multiple threads concurrently. 
    /// 
    ///  
    [DebuggerDisplay("Id={Id}")] 
    [DebuggerTypeProxy(typeof(SystemThreadingTasks_TaskSchedulerDebugView))]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] 
    [PermissionSet(SecurityAction.InheritanceDemand, Unrestricted = true)]
    public abstract class TaskScheduler
    {
        //////////////////////////////////////////////////////////// 
        //
        // User Provided Methods and Properties 
        // 

        ///  
        /// Queues a Task to the scheduler.
        /// 
        /// 
        ///  
        /// A class derived from TaskScheduler
        /// implements this method to accept tasks being scheduled on the scheduler. 
        /// A typical implementation would store the task in an internal data structure, which would 
        /// be serviced by threads that would execute those tasks at some time in the future.
        ///  
        /// 
        /// This method is only meant to be called by the .NET Framework and
        /// should not be called directly by the derived class. This is necessary
        /// for maintaining the consistency of the system. 
        /// 
        ///  
        /// The Task to be queued. 
        /// The  argument is null.
        [SecurityCritical] 
        protected internal abstract void QueueTask(Task task);

        /// 
        /// Determines whether the provided Task 
        /// can be executed synchronously in this call, and if it can, executes it.
        ///  
        ///  
        /// 
        /// A class derived from TaskScheduler implements this function to 
        /// support inline execution of a task on a thread that initiates a wait on that task object. Inline
        /// execution is optional, and the request may be rejected by returning false. However, better
        /// scalability typically results the more tasks that can be inlined, and in fact a scheduler that
        /// inlines too little may be prone to deadlocks. A proper implementation should ensure that a 
        /// request executing under the policies guaranteed by the scheduler can successfully inline. For
        /// example, if a scheduler uses a dedicated thread to execute tasks, any inlining requests from that 
        /// thread should succeed. 
        /// 
        ///  
        /// If a scheduler decides to perform the inline execution, it should do so by calling to the base
        /// TaskScheduler's
        /// TryExecuteTask method with the provided task object, propagating
        /// the return value. It may also be appropriate for the scheduler to remove an inlined task from its 
        /// internal data structures if it decides to honor the inlining request. Note, however, that under
        /// some circumstances a scheduler may be asked to inline a task that was not previously provided to 
        /// it with the  method. 
        /// 
        ///  
        /// The derived scheduler is responsible for making sure that the calling thread is suitable for
        /// executing the given task as far as its own scheduling and execution policies are concerned.
        /// 
        ///  
        /// The Task to be
        /// executed. 
        /// A Boolean denoting whether or not task has previously been 
        /// queued. If this parameter is True, then the task may have been previously queued (scheduled); if
        /// False, then the task is known not to have been queued, and this call is being made in order to 
        /// execute the task inline without queueing it.
        /// A Boolean value indicating whether the task was executed inline.
        /// The  argument is
        /// null. 
        /// The  was already
        /// executed. 
        [SecurityCritical] 
        protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
 
        /// 
        /// Generates an enumerable of Task instances
        /// currently queued to the scheduler waiting to be executed.
        ///  
        /// 
        ///  
        /// A class derived from  implements this method in order to support 
        /// integration with debuggers. This method will only be invoked by the .NET Framework when the
        /// debugger requests access to the data. The enumerable returned will be traversed by debugging 
        /// utilities to access the tasks currently queued to this scheduler, enabling the debugger to
        /// provide a representation of this information in the user interface.
        /// 
        ///  
        /// It is important to note that, when this method is called, all other threads in the process will
        /// be frozen. Therefore, it's important to avoid synchronization with other threads that may lead to 
        /// blocking. If synchronization is necessary, the method should prefer to throw a 
        /// than to block, which could cause a debugger to experience delays. Additionally, this method and 
        /// the enumerable returned must not modify any globally visible state.
        /// 
        /// 
        /// The returned enumerable should never be null. If there are currently no queued tasks, an empty 
        /// enumerable should be returned instead.
        ///  
        ///  
        /// For developers implementing a custom debugger, this method shouldn't be called directly, but
        /// rather this functionality should be accessed through the internal wrapper method 
        /// GetScheduledTasksForDebugger:
        /// internal Task[] GetScheduledTasksForDebugger(). This method returns an array of tasks,
        /// rather than an enumerable. In order to retrieve a list of active schedulers, a debugger may use
        /// another internal method: internal static TaskScheduler[] GetTaskSchedulersForDebugger(). 
        /// This static method returns an array of all active TaskScheduler instances.
        /// GetScheduledTasksForDebugger then may be used on each of these scheduler instances to retrieve 
        /// the list of scheduled tasks for each. 
        /// 
        ///  
        /// An enumerable that allows traversal of tasks currently queued to this scheduler.
        /// 
        /// 
        /// This scheduler is unable to generate a list of queued tasks at this time. 
        /// 
        [SecurityCritical] 
        protected abstract IEnumerable GetScheduledTasks(); 

        ///  
        /// Indicates the maximum concurrency level this
        ///   is able to support.
        /// 
        public virtual Int32 MaximumConcurrencyLevel 
        {
            get 
            { 
                return Int32.MaxValue;
            } 
        }


        //////////////////////////////////////////////////////////// 
        //
        // Internal overridable methods 
        // 

        ///  
        /// Retrieves some thread static state that can be cached and passed to multiple
        /// TryRunInline calls, avoiding superflous TLS fetches.
        /// 
        /// A bag of TLS state (or null if none exists). 
        [SecuritySafeCritical]
        internal virtual object GetThreadStatics() 
        { 
            return null;
        } 

        /// 
        /// Attempts to execute the target task synchronously.
        ///  
        /// The task to run.
        /// True if the task may have been previously queued, 
        /// false if the task was absolutely not previously queued. 
        /// The state retrieved from GetThreadStatics
        /// True if it ran, false otherwise. 
        [SecuritySafeCritical]
        internal bool TryRunInline(Task task, bool taskWasPreviouslyQueued, object threadStatics)
        {
            // Do not inline unstarted tasks (i.e., task.ExecutingTaskScheduler == null). 
            // Do not inline TaskCompletionSource-style (a.k.a. "promise") tasks.
            // No need to attempt inlining if the task body was already run (i.e. either TASK_STATE_DELEGATE_INVOKED or TASK_STATE_CANCELED bits set) 
            TaskScheduler ets = task.ExecutingTaskScheduler; 

            // Delegate cross-scheduler inlining requests to target scheduler 
            if(ets != this && ets !=null) return ets.TryRunInline(task, taskWasPreviouslyQueued);

            if( (ets == null) ||
                (task.m_action == null) || 
                task.IsDelegateInvoked ||
                task.IsCanceled || 
                Task.CurrentStackGuard.TryBeginInliningScope() == false) 
            {
                return false; 
            }

            // Task class will still call into TaskScheduler.TryRunInline rather than TryExecuteTaskInline() so that
            // 1) we can adjust the return code from TryExecuteTaskInline in case a buggy custom scheduler lies to us 
            // 2) we maintain a mechanism for the TLS lookup optimization that we used to have for the ConcRT scheduler (will potentially introduce the same for TP)
            bool bInlined = false; 
            try 
            {
                bInlined = TryExecuteTaskInline(task, taskWasPreviouslyQueued); 
            }
            finally
            {
                Task.CurrentStackGuard.EndInliningScope(); 
            }
 
            // If the custom scheduler returned true, we should either have the TASK_STATE_DELEGATE_INVOKED or TASK_STATE_CANCELED bit set 
            // Otherwise the scheduler is buggy
            if (bInlined && !(task.IsDelegateInvoked || task.IsCanceled)) 
            {
                throw new InvalidOperationException(Environment.GetResourceString("TaskScheduler_InconsistentStateAfterTryExecuteTaskInline"));
            }
 
            return bInlined;
        } 
 
        [SecuritySafeCritical]
        internal bool TryRunInline(Task task, bool taskWasPreviouslyQueued) 
        {
            return TryRunInline(task, taskWasPreviouslyQueued, GetThreadStatics());
        }
 
        /// 
        /// Attempts to dequeue a Task that was previously queued to 
        /// this scheduler. 
        /// 
        /// The Task to be dequeued. 
        /// A Boolean denoting whether the  argument was successfully dequeued.
        /// The  argument is null.
        [SecurityCritical]
        protected internal virtual bool TryDequeue(Task task) 
        {
            return false; 
        } 

        ///  
        /// Notifies the scheduler that a work item has made progress.
        /// 
        internal virtual void NotifyWorkItemProgress()
        { 
        }
 
        ///  
        /// Indicates whether this is a custom scheduler, in which case the safe code paths will be taken upon task entry
        /// using a CAS to transition from queued state to executing. 
        /// 
        internal virtual bool RequiresAtomicStartTransition
        {
            get { return true; } 
        }
 
 

        //////////////////////////////////////////////////////////// 
        //
        // Member variables
        //
 
        // An AppDomain-wide default manager.
        private static TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler(); 
 
        //static counter used to generate unique TaskScheduler IDs
        internal static int s_taskSchedulerIdCounter; 

        // this TaskScheduler's unique ID
        private int m_taskSchedulerId;
 
        // We keep a weak reference to ourselves to be uniquely identified in the global
        // static collection of active schedulers without being pinned down in memory, as well 
        // to convert it later to a real TS object when enumerating for the debugger 
        internal WeakReference m_weakReferenceToSelf;
 
        // The global container that keeps track of TaskScheduler instances. Lazily initialized
        private static ConcurrentDictionary s_activeTaskSchedulers;

 
        ////////////////////////////////////////////////////////////
        // 
        // Constructors and public properties 
        //
 
        /// 
        /// Initializes the .
        /// 
        protected TaskScheduler() 
        {
            // Protected constructor. It's here to ensure all user implemented TaskSchedulers will be 
            // registered in the active schedulers list. 
            m_weakReferenceToSelf = new WeakReference(this);
            RegisterTaskScheduler(this); 
        }

        /// 
        /// Frees all resources associated with this scheduler. 
        /// 
        ~TaskScheduler() 
        { 
            // Finalizer to remove us out of the active schedulers list
            UnregisterTaskScheduler(this); 
        }

        /// 
        /// Gets the default TaskScheduler instance. 
        /// 
        public static TaskScheduler Default 
        { 
            get
            { 
                return s_defaultTaskScheduler;
            }
        }
 
        /// 
        /// Gets the TaskScheduler 
        /// associated with the currently executing task. 
        /// 
        ///  
        /// When not called from within a task,  will return the  scheduler.
        /// 
        public static TaskScheduler Current
        { 
            get
            { 
                Task currentTask = Task.InternalCurrent; 

                if (currentTask != null) 
                {
                    return currentTask.ExecutingTaskScheduler;
                }
                else 
                {
                    return TaskScheduler.Default; 
                } 
            }
        } 

        /// 
        /// Creates a 
        /// associated with the current . 
        /// 
        ///  
        /// All Task instances queued to 
        /// the returned scheduler will be executed through a call to the
        /// Post method 
        /// on that context.
        /// 
        /// 
        /// A  associated with 
        /// the current SynchronizationContext, as
        /// determined by SynchronizationContext.Current. 
        ///  
        /// 
        /// The current SynchronizationContext may not be used as a TaskScheduler. 
        /// 
        public static TaskScheduler FromCurrentSynchronizationContext()
        {
            return new SynchronizationContextTaskScheduler(); 
        }
 
        ///  
        /// Gets the unique ID for this .
        ///  
        public Int32 Id
        {
            get
            { 
                if (m_taskSchedulerId == 0)
                { 
                    int newId = 0; 

                    // We need to repeat if Interlocked.Increment wraps around and returns 0. 
                    // Otherwise next time this scheduler's Id is queried it will get a new value
                    do
                    {
                        newId = Interlocked.Increment(ref s_taskSchedulerIdCounter); 
                    } while (newId == 0);
 
                    Interlocked.CompareExchange(ref m_taskSchedulerId, newId, 0); 
                }
 
                return m_taskSchedulerId;
            }
        }
 
        /// 
        /// Attempts to execute the provided Task 
        /// on this scheduler. 
        /// 
        ///  
        /// 
        /// Scheduler implementations are provided with Task
        /// instances to be executed through either the  method or the
        ///  method. When the scheduler deems it appropriate to run the 
        /// provided task,  should be used to do so. TryExecuteTask handles all
        /// aspects of executing a task, including action invocation, exception handling, state management, 
        /// and lifecycle control. 
        /// 
        ///  
        ///  must only be used for tasks provided to this scheduler by the .NET
        /// Framework infrastructure. It should not be used to execute arbitrary tasks obtained through
        /// custom mechanisms.
        ///  
        /// 
        ///  
        /// A Task object to be executed. 
        /// 
        /// The  is not associated with this scheduler. 
        /// 
        /// A Boolean that is true if  was successfully executed, false if it
        /// was not. A common reason for execution failure is that the task had previously been executed or
        /// is in the process of being executed by another thread. 
        [SecurityCritical]
        protected bool TryExecuteTask(Task task) 
        { 
            if (task.ExecutingTaskScheduler != this)
            { 
                throw new InvalidOperationException(Environment.GetResourceString("TaskScheduler_ExecuteTask_WrongTaskScheduler"));
            }

            return task.ExecuteEntry(true); 
        }
 
        //////////////////////////////////////////////////////////// 
        //
        // Events 
        //

        private static event EventHandler _unobservedTaskException;
        private static object _unobservedTaskExceptionLockObject = new object(); 

        ///  
        /// Occurs when a faulted 's unobserved exception is about to trigger exception escalation 
        /// policy, which, by default, would terminate the process.
        ///  
        /// 
        /// This AppDomain-wide event provides a mechanism to prevent exception
        /// escalation policy (which, by default, terminates the process) from triggering.
        /// Each handler is passed a  
        /// instance, which may be used to examine the exception and to mark it as observed.
        ///  
        public static event EventHandler UnobservedTaskException 
        {
            [System.Security.SecurityCritical] 
            add
            {
                if (value != null)
                { 
#if !PFX_LEGACY_3_5
                    RuntimeHelpers.PrepareContractedDelegate(value); 
#endif 
                    lock (_unobservedTaskExceptionLockObject) _unobservedTaskException += value;
                } 
            }

            [System.Security.SecurityCritical]
            remove 
            {
                lock (_unobservedTaskExceptionLockObject) _unobservedTaskException -= value; 
            } 
        }
 



 

        //////////////////////////////////////////////////////////// 
        // 
        // Internal methods
        // 

        // This is called by the TaskExceptionHolder finalizer.
        internal static void PublishUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs ueea)
        { 
            // Lock this logic to prevent just-unregistered handlers from being called.
            lock (_unobservedTaskExceptionLockObject) 
            { 
                // Since we are under lock, it is technically no longer necessary
                // to make a copy.  It is done here for convenience. 
                EventHandler handler = _unobservedTaskException;
                if (handler != null)
                {
                    handler(sender, ueea); 
                }
            } 
        } 

        ///  
        /// Provides an array of all queued Task instances
        /// for the debugger.
        /// 
        ///  
        /// The returned array is populated through a call to .
        /// Note that this function is only meant to be invoked by a debugger remotely. 
        /// It should not be called by any other codepaths. 
        /// 
        /// An array of Task instances. 
        /// 
        /// This scheduler is unable to generate a list of queued tasks at this time.
        /// 
        [SecurityCritical] 
        internal Task[] GetScheduledTasksForDebugger()
        { 
            // this can throw InvalidOperationException indicating that they are unable to provide the info 
            // at the moment. We should let the debugger receive that exception so that it can indicate it in the UI
            IEnumerable activeTasksSource = GetScheduledTasks(); 

            if (activeTasksSource == null)
                return null;
 
            // If it can be cast to an array, use it directly
            Task[] activeTasksArray = activeTasksSource as Task[]; 
            if (activeTasksArray == null) 
            {
                activeTasksArray = (new List(activeTasksSource)).ToArray(); 
            }

            // touch all Task.Id fields so that the debugger doesn't need to do a lot of cross-proc calls to generate them
            foreach (Task t in activeTasksArray) 
            {
                int tmp = t.Id; 
            } 

            return activeTasksArray; 
        }

        /// 
        /// Provides an array of all active TaskScheduler 
        /// instances for the debugger.
        ///  
        ///  
        /// This function is only meant to be invoked by a debugger remotely.
        /// It should not be called by any other codepaths. 
        /// 
        /// An array of TaskScheduler instances.
        [SecurityCritical]
        internal static TaskScheduler[] GetTaskSchedulersForDebugger() 
        {
            // To populate this array we walk the global collection of schedulers (s_activeTaskSchedulers). 
 
            TaskScheduler[] activeSchedulers = new TaskScheduler[s_activeTaskSchedulers.Count];
 
            IEnumerator> tsEnumerator = s_activeTaskSchedulers.GetEnumerator();

            int index = 0;
            while (tsEnumerator.MoveNext()) 
            {
                // convert the weak reference to the real TS object 
                TaskScheduler ts = tsEnumerator.Current.Key.Target as TaskScheduler; 
                if (ts == null)
                    continue; 
                activeSchedulers[index++] = ts;
                int tmp = ts.Id;
            }
 
            return activeSchedulers;
        } 
 
        /// 
        /// Registers a new TaskScheduler instance in the global collection of schedulers. 
        /// 
        internal static void RegisterTaskScheduler(TaskScheduler ts)
        {
            LazyInitializer.EnsureInitialized>(ref s_activeTaskSchedulers); 

            bool bResult = s_activeTaskSchedulers.TryAdd(ts.m_weakReferenceToSelf, null); 
            Contract.Assert(bResult); 
        }
 
        /// 
        /// Removes a TaskScheduler instance from the global collection of schedulers.
        /// 
        internal static void UnregisterTaskScheduler(TaskScheduler ts) 
        {
            Contract.Assert(s_activeTaskSchedulers != null); 
 
            object tmpObj;
            bool bResult = s_activeTaskSchedulers.TryRemove(ts.m_weakReferenceToSelf, out tmpObj); 

            Contract.Assert(bResult);
        }
 

        ///  
        /// Nested class that provides debugger view for TaskScheduler 
        /// 
        internal sealed class SystemThreadingTasks_TaskSchedulerDebugView 
        {
            private readonly TaskScheduler m_taskScheduler;
            public SystemThreadingTasks_TaskSchedulerDebugView(TaskScheduler scheduler)
            { 
                m_taskScheduler = scheduler;
            } 
 
            // returns the scheduler�s Id
            public Int32 Id 
            {
                get { return m_taskScheduler.Id; }
            }
 
            // returns the scheduler�s GetScheduledTasks
            public IEnumerable ScheduledTasks 
            { 
                [SecurityCritical]
                get { return m_taskScheduler.GetScheduledTasks(); } 
            }
        }

    } 

 
 

    ///  
    /// A TaskScheduler implementation that executes all tasks queued to it through a call to
    ///  on the 
    /// that its associated with. The default constructor for this class binds to the current 
    ///  
    internal sealed class SynchronizationContextTaskScheduler : TaskScheduler
    { 
        private SynchronizationContext m_synchronizationContext; 

        ///  
        /// Constructs a SynchronizationContextTaskScheduler associated with 
        /// 
        /// This constructor expects  to be set.
        internal SynchronizationContextTaskScheduler() 
        {
            SynchronizationContext synContext = SynchronizationContext.Current; 
 
            // make sure we have a synccontext to work with
            if (synContext == null) 
            {
                throw new InvalidOperationException(Environment.GetResourceString("TaskScheduler_FromCurrentSynchronizationContext_NoCurrent"));
            }
 
            m_synchronizationContext = synContext;
 
        } 

        ///  
        /// Implemetation of  for this scheduler class.
        ///
        /// Simply posts the tasks to be executed on the associated .
        ///  
        /// 
        [SecurityCritical] 
        protected internal override void QueueTask(Task task) 
        {
            m_synchronizationContext.Post(s_postCallback, (object)task); 
        }

        /// 
        /// Implementation of   for this scheduler class. 
        ///
        /// The task will be executed inline only if the call happens within 
        /// the associated . 
        /// 
        ///  
        /// 
        [SecurityCritical]
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        { 
            if (SynchronizationContext.Current == m_synchronizationContext)
            { 
                return TryExecuteTask(task); 
            }
            else 
                return false;
        }

        // not implemented 
        [SecurityCritical]
        protected override IEnumerable GetScheduledTasks() 
        { 
            return null;
        } 

        /// 
        /// Implementes the  property for
        /// this scheduler class. 
        ///
        /// By default it returns 1, because a  based 
        /// scheduler only supports execution on a single thread. 
        /// 
        public override Int32 MaximumConcurrencyLevel 
        {
            get
            {
                return 1; 
            }
        } 
 
        // preallocated SendOrPostCallback delegate
        private static SendOrPostCallback s_postCallback = new SendOrPostCallback(PostCallback); 

        // this is where the actual task invocation occures
        private static void PostCallback(object obj)
        { 
            Task task = (Task) obj;
 
            // calling ExecuteEntry with double execute check enabled because a user implemented SynchronizationContext could be buggy 
            task.ExecuteEntry(true);
        } 
    }

    /// 
    /// Provides data for the event that is raised when a faulted 's 
    /// exception goes unobserved.
    ///  
    ///  
    /// The Exception property is used to examine the exception without marking it
    /// as observed, whereas the  method is used to mark the exception 
    /// as observed.  Marking the exception as observed prevents it from triggering exception escalation policy
    /// which, by default, terminates the process.
    /// 
    public class UnobservedTaskExceptionEventArgs : EventArgs 
    {
        private AggregateException m_exception; 
        internal bool m_observed = false; 

        ///  
        /// Initializes a new instance of the  class
        /// with the unobserved exception.
        /// 
        /// The Exception that has gone unobserved. 
        public UnobservedTaskExceptionEventArgs(AggregateException exception) { m_exception = exception; }
 
        ///  
        /// Marks the  as "observed," thus preventing it
        /// from triggering exception escalation policy which, by default, terminates the process. 
        /// 
        public void SetObserved() { m_observed = true; }

        ///  
        /// Gets whether this exception has been marked as "observed."
        ///  
        public bool Observed { get { return m_observed; } } 

        ///  
        /// The Exception that went unobserved.
        /// 
        public AggregateException Exception { get { return m_exception; } }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK