SynchronizedDispatch.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Runtime / Remoting / SynchronizedDispatch.cs / 1305376 / SynchronizedDispatch.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
//
//   Synchronization Property for URT Contexts. Uses the ThreadPool API. 
//   An instance of this property in a context enforces a synchronization 
//   domain for the context (and all contexts that share the same instance).
//   This means that at any instant, at most 1 thread could be executing 
//   in all contexts that share an instance of this property.
//
//   This is done by contributing sinks that intercept and serialize in-coming
//   calls for the respective contexts. 
//
//   If the property is marked for re-entrancy, then call-outs are 
//   intercepted too. The call-out interception allows other waiting threads 
//   to enter the synchronization domain for maximal throughput.
// 
namespace System.Runtime.Remoting.Contexts {
    using System.Threading;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Messaging; 
    using System.Runtime.Remoting.Activation;
    using System.Security.Permissions; 
    using System; 
    using System.Diagnostics.Contracts;
    using Queue = System.Collections.Queue; 
    using ArrayList = System.Collections.ArrayList;
    [System.Security.SecurityCritical]  // auto-generated_required
    [Serializable]
    [AttributeUsage(AttributeTargets.Class)] 
    [SecurityPermissionAttribute(SecurityAction.InheritanceDemand, Flags=SecurityPermissionFlag.Infrastructure)]
    [System.Runtime.InteropServices.ComVisible(true)] 
    public class SynchronizationAttribute 
        : ContextAttribute, IContributeServerContextSink,
                    IContributeClientContextSink 
    {
        // The class should not be instantiated in a context that has Synchronization
        public const int NOT_SUPPORTED  = 0x00000001;
 
        // The class does not care if the context has Synchronization or not
        public const int SUPPORTED      = 0x00000002; 
 
        // The class should be instantiated in a context that has Synchronization
        public const int REQUIRED    = 0x00000004; 

        // The class should be instantiated in a context with a new instance of
        // Synchronization property each time
        public const int REQUIRES_NEW = 0x00000008; 

        private const String PROPERTY_NAME = "Synchronization"; 
 
        private static readonly int _timeOut = -1;
        // event that releases a thread-pool worker thread 
        [NonSerialized]
        internal AutoResetEvent _asyncWorkEvent;
        [NonSerialized]
        private RegisteredWaitHandle _waitHandle; 

        // queue of work items. 
        [NonSerialized] 
        internal Queue _workItemQueue;
        // flag for the domain lock (access always synchronized on the _workItemQueue) 
        [NonSerialized]
        internal bool _locked;
        // flag to indicate if the lock should be released during call-outs
        internal bool _bReEntrant; 
        // flag for use as an attribute on types
        internal int _flavor; 
 
        [NonSerialized]
        private SynchronizationAttribute _cliCtxAttr; 
        // Logical call id (used only in non-reentrant case for deadlock avoidance)
        [NonSerialized]
        private String _syncLcid;
        [NonSerialized] 
        private ArrayList _asyncLcidList;
 
 
        public virtual bool Locked {get { return _locked;} set { _locked=value; } }
        public virtual bool IsReEntrant { get { return _bReEntrant;} } 

        internal String SyncCallOutLCID
        {
            get 
            {
                Contract.Assert( 
                    !_bReEntrant, 
                    "Should not use this for the reentrant case");
 
                return _syncLcid;
            }

            set 
            {
                Contract.Assert( 
                    !_bReEntrant, 
                    "Should not use this for the reentrant case");
 
                Contract.Assert(
                    _syncLcid==null
                        || (_syncLcid!=null && value==null)
                        || _syncLcid.Equals(value), 
                    "context can be associated with one logical call at a time");
 
                _syncLcid = value; 
            }
        } 

        internal ArrayList AsyncCallOutLCIDList
        {
            get { return _asyncLcidList; } 
        }
 
        internal bool IsKnownLCID(IMessage reqMsg) 
        {
            String msgLCID = 
                ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey])
                    .RemotingData.LogicalCallID;
            return ( msgLCID.Equals(_syncLcid)
                    || _asyncLcidList.Contains(msgLCID)); 

        } 
 

        /* 
        *   Constructor for the synchronized dispatch property
        */
        public SynchronizationAttribute()
 
            : this(REQUIRED, false) {
        } 
 
        /*
        *   Constructor. 
        *   If reEntrant is true, we allow other calls to come in
        *   if the currently running call leaves the domain for a call-out.
        */
        public SynchronizationAttribute(bool reEntrant) 

            : this(REQUIRED, reEntrant) { 
        } 

        public SynchronizationAttribute(int flag) 

            : this(flag, false) {
        }
 
        public SynchronizationAttribute(int flag, bool reEntrant)
 
            // Invoke ContextProperty ctor! 
            : base(PROPERTY_NAME) {
 
            _bReEntrant = reEntrant;

            switch (flag)
            { 
            case NOT_SUPPORTED:
            case SUPPORTED: 
            case REQUIRED: 
            case REQUIRES_NEW:
                _flavor = flag; 
                break;
            default:
                throw new ArgumentException(Environment.GetResourceString("Argument_InvalidFlag"), "flag");
            } 
        }
 
        // Dispose off the WaitHandle registered in Initialization 
        internal void Dispose()
        { 
            //Unregister the RegisteredWaitHandle
            if (_waitHandle != null)
                _waitHandle.Unregister(null);
        } 

        // Override ContextAttribute's implementation of IContextAttribute::IsContextOK 
        [System.Security.SecurityCritical] 
        [System.Runtime.InteropServices.ComVisible(true)]
        public override bool IsContextOK(Context ctx, IConstructionCallMessage msg) 
        {
            if (ctx == null)
                throw new ArgumentNullException("ctx");
            if (msg == null) 
                throw new ArgumentNullException("msg");
            Contract.EndContractBlock(); 
 
            // <
 
            bool isOK = true;
            if (_flavor == REQUIRES_NEW)
            {
                isOK = false; 
                // Each activation request instantiates a new attribute class.
                // We are relying on that for the REQUIRES_NEW case! 
                Contract.Assert(ctx.GetProperty(PROPERTY_NAME) != this, 
                    "ctx.GetProperty(PROPERTY_NAME) != this");
            } 
            else
            {
                SynchronizationAttribute syncProp = (SynchronizationAttribute) ctx.GetProperty(PROPERTY_NAME);
                if (   ( (_flavor == NOT_SUPPORTED)&&(syncProp != null) ) 
                    || ( (_flavor == REQUIRED)&&(syncProp == null) )
                    ) 
                { 
                    isOK = false;
                } 

                if (_flavor == REQUIRED)
                {
                    // pick up the property from the current context 
                    _cliCtxAttr = syncProp;
                } 
            } 
            return isOK;
        } 

        // Override ContextAttribute's impl. of IContextAttribute::GetPropForNewCtx
        [System.Security.SecurityCritical]
        [System.Runtime.InteropServices.ComVisible(true)] 
        public override void GetPropertiesForNewContext(IConstructionCallMessage ctorMsg)
        { 
            if ( (_flavor==NOT_SUPPORTED) || (_flavor==SUPPORTED) || (null == ctorMsg) ) 
            {
                return ; 
            }

            if (_cliCtxAttr != null)
            { 
                Contract.Assert(_flavor == REQUIRED,"Use cli-ctx property only for the REQUIRED flavor");
                ctorMsg.ContextProperties.Add((IContextProperty)_cliCtxAttr); 
                _cliCtxAttr = null; 
            }
            else 
            {
                ctorMsg.ContextProperties.Add((IContextProperty)this);
            }
        } 

        // We need this to make the use of the property as an attribute 
        // light-weight. This allows us to delay initialize everything we 
        // need to fully function as a ContextProperty.
        internal virtual void InitIfNecessary() 
        {
            lock(this)
            {
                if (_asyncWorkEvent == null) 
                {
                    // initialize thread pool event to non-signaled state. 
                    _asyncWorkEvent = new AutoResetEvent(false); 

                    _workItemQueue = new Queue(); 
                    _asyncLcidList = new ArrayList();

                    WaitOrTimerCallback callBackDelegate =
                        new WaitOrTimerCallback(this.DispatcherCallBack); 

                    // Register a callback to be executed by the thread-pool 
                    // each time the event is signaled. 
                    _waitHandle = ThreadPool.RegisterWaitForSingleObject(
                                    _asyncWorkEvent, 
                                    callBackDelegate,
                                    null, // state info
                                    _timeOut,
                                    false); // bExecuteOnlyOnce 
                }
            } 
        } 

        /* 
        * Call back function -- executed for each work item that
        * was enqueued. This is invoked by a thread-pool thread for
        * async work items and the caller thread for [....] items.
        */ 
        private void DispatcherCallBack(Object stateIgnored, bool ignored)
        { 
            // This function should be called by only one thread at a time. We will 
            // ensure this by releasing exactly one waiting thread to go work on
            // a WorkItem 

            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- In DispatherCallBack ");

            Contract.Assert(_locked==true,"_locked==true"); 
            WorkItem work;
            // get the work item out of the queue. 
            lock (_workItemQueue) 
            {
                work = (WorkItem) _workItemQueue.Dequeue(); 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- Dequeued Work for: " + work._thread.GetHashCode());
            }
            Contract.Assert(work!=null,"work!=null");
            Contract.Assert(work.IsSignaled() && !(work.IsDummy()),"work.IsSignaled() && !(work.IsDummy())"); 
            // execute the work item (WorkItem.Execute will switch to the proper context)
            ExecuteWorkItem(work); 
            HandleWorkCompletion(); 
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- CallBack finished for: " + work._thread.GetHashCode());
        } 

        /*
        *   This is used by the call-out (client context) sinks to notify
        *   the domain manager that the thread is leaving 
        */
        internal virtual void HandleThreadExit() 
        { 
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread EXIT ~~~~");
            // For now treat this as if the work was completed! 
            Contract.Assert(_locked==true,"_locked==true");
            HandleWorkCompletion();
        }
 
        /*
        *   This is used by a returning call-out thread to request 
        *   that it be queued for re-entry into the domain. 
        */
        internal virtual void HandleThreadReEntry() 
        {
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread REQUEST REENTRY ~~~~");
            // Treat this as if a new work item needs to be done
            // < 

            WorkItem work = new WorkItem(null, null, null); 
            work.SetDummy(); 
            HandleWorkRequest(work);
        } 

        /*
        *   This gets called at the end of work.Execute and from
        *   HandleThreadExit() in the re-entrant scenario. 
        *   This is the point where we decide what to do next!
        */ 
        internal virtual void HandleWorkCompletion() 
        {
            // We should still have the lock held for the workItem that just completed 
            Contract.Assert(_locked==true,"_locked==true");
            // Now we check the queue to see if we need to release any one?
            WorkItem nextWork = null;
            bool bNotify = false; 
            lock (_workItemQueue)
            { 
                if (_workItemQueue.Count >= 1) 
                {
                    nextWork = (WorkItem) _workItemQueue.Peek(); 
                    bNotify = true;
                    nextWork.SetSignaled();
                }
                else 
                {
                    // We set locked to false only in the case there is no 
                    // next work to be done. 
                    // NOTE: this is the only place _locked in ever set to false!
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Domain UNLOCKED!"); 
                    _locked = false;
                }
            }
            // See if we found a non-signaled work item at the head. 
            if (bNotify)
            { 
                // In both [....] and async cases we just hand off the _locked state to 
                // the next thread which will execute.
                if (nextWork.IsAsync()) 
                {
                    // Async-WorkItem: signal ThreadPool event to release one thread
                    _asyncWorkEvent.Set();
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Signal " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL ")); 
                }
                else 
                { 
                    // [....]-WorkItem: notify the waiting [....]-thread.
                    lock(nextWork) 
                    {
                        Monitor.Pulse(nextWork);
                        //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Notify " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL ") );
                    } 
                }
            } 
        } 

        /* 
        *   This is called by any new incoming thread or from
        *   HandleThreadReEntry() when a call-out thread wants to
        *   re-enter the domain.
        *   In the latter case, the WorkItem is a dummy item, it 
        *   just serves the purpose of something to block on till
        *   the thread is given a green signal to re-enter. 
        */ 
        internal virtual void HandleWorkRequest(WorkItem work)
        { 
            // <


            bool bQueued; 

            // Check for nested call backs 
            if (!IsNestedCall(work._reqMsg)) 
            {
                // See what type of work it is 
                if (work.IsAsync())
                {
                    // Async work is always queued.
                    bQueued = true; 
                    // Enqueue the workItem
                    lock (_workItemQueue) 
                    { 
                        //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Item EnQueue " + work._thread.GetHashCode());
                        work.SetWaiting(); 
                        _workItemQueue.Enqueue(work);
                        // If this is the only work item in the queue we will
                        // have to trigger the thread-pool event ourselves
                        if ( (!_locked) && (_workItemQueue.Count == 1) ) 
                        {
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Signal Self: " + work._thread.GetHashCode()); 
                            work.SetSignaled(); 
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!");
                            _locked = true; 
                            _asyncWorkEvent.Set();
                        }
                    }
                } 
                else
                { 
                    // [....] work is queued only if there are other items 
                    // already in the queue.
                    lock(work) 
                    {
                        // Enqueue if we need to
                        lock(_workItemQueue)
                        { 
                            if ((!_locked) && (_workItemQueue.Count == 0))
                            { 
                                _locked = true; 
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!");
                                bQueued = false; 
                            }
                            else
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ ENQUEUE [....]!" + (work.IsDummy()?" DUMMY ":" REAL ") + work._thread); 
                                bQueued = true;
                                work.SetWaiting(); 
                                _workItemQueue.Enqueue(work); 
                            }
                        } 

                        if (bQueued == true)
                        {
                            // If we queued a work item we must wait for some 
                            // other thread to peek at it and Notify us.
 
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ WORK::WAIT" + work._thread); 
                            Monitor.Wait(work);
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ FINISH Work::WAIT" + work._thread); 
                            Contract.Assert(_locked==true,"_locked==true");
                            // Our turn to complete the work!
                            // Execute the callBack only if this is real work
                            if (!work.IsDummy()) 
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Invoke DispatcherCallBack " + work._thread); 
                                // We invoke the callback here that does exactly 
                                // what we need to do ... dequeue work, execute, checkForMore
                                DispatcherCallBack(null, true); 
                            }
                            else
                            {
                                // DummyWork is just use to block/unblock a returning call. 
                                // Throw away our dummy WorkItem.
                                lock(_workItemQueue) 
                                { 
                                    _workItemQueue.Dequeue();
                                } 
                                // We don't check for more work here since we are already
                                // in the midst of an executing WorkItem (at the end of which
                                // the check will be performed)
                            } 
                        }
                        else 
                        { 
                            // We did not queue the work item.
                            if (!work.IsDummy()) 
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute direct" + work._thread);
                                // Execute the work.
                                Contract.Assert(_locked==true,"_locked==true"); 
                                work.SetSignaled();
                                ExecuteWorkItem(work); 
                                // Check for more work 
                                HandleWorkCompletion();
                            } 
                        }
                    }
                }
            } 
            else
            { 
                // We allow the nested calls to execute directly 

                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute Nested Call direct" + work._thread); 
                // Execute the work.
                Contract.Assert(_locked==true,"_locked==true");
                work.SetSignaled();
                work.Execute(); 
                // We are still inside the top level call ...
                // so after work.Execute finishes we don't check for more work 
                // or unlock the domain as we do elsewhere. 
            }
        } 

        internal void ExecuteWorkItem(WorkItem work)
        {
            work.Execute(); 
        }
 
        internal bool IsNestedCall(IMessage reqMsg) 
        {
            // This returns TRUE only if it is a non-reEntrant context 
            // AND
            // (the LCID of the reqMsg matches that of
            // the top level [....] call lcid associated with the context.
            //  OR 
            // it matches one of the async call out lcids)
 
            bool bNested = false; 
            if (!IsReEntrant)
            { 
                String lcid = SyncCallOutLCID;
                if (lcid != null)
                {
                    // This means we are inside a top level call 
                    LogicalCallContext callCtx =
                        (LogicalCallContext)reqMsg.Properties[Message.CallContextKey]; 
 
                    if ( callCtx!=null &&
                        lcid.Equals(callCtx.RemotingData.LogicalCallID)) 
                    {
                        // This is a nested call (we made a call out during
                        // the top level call and eventually that has resulted
                        // in an incoming call with the same lcid) 
                        bNested = true;
                    } 
                } 
                if (!bNested && AsyncCallOutLCIDList.Count>0)
                { 
                    // This means we are inside a top level call
                    LogicalCallContext callCtx =
                        (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
                    if (AsyncCallOutLCIDList.Contains(callCtx.RemotingData.LogicalCallID)) 
                    {
                        bNested = true; 
                    } 
                }
            } 
            return bNested;
        }

 
        /*
        *   Implements IContributeServerContextSink::GetServerContextSink 
        *   Create a SynchronizedDispatch sink and return it. 
        */
        [System.Security.SecurityCritical] 
        public virtual IMessageSink GetServerContextSink(IMessageSink nextSink)
        {
            InitIfNecessary();
 
            SynchronizedServerContextSink propertySink =
                new SynchronizedServerContextSink( 
                            this, 
                            nextSink);
 
            return (IMessageSink) propertySink;
        }

        /* 
        *   Implements IContributeClientContextSink::GetClientContextSink
        *   Create a CallOut sink and return it. 
        */ 
        [System.Security.SecurityCritical]
        public virtual IMessageSink GetClientContextSink(IMessageSink nextSink) 
        {
            InitIfNecessary();

            SynchronizedClientContextSink propertySink = 
                new SynchronizedClientContextSink(
                            this, 
                            nextSink); 

            return (IMessageSink) propertySink; 
        }

    }
 
    /*************************************** SERVER SINK ********************************/
    /* 
    *   Implements the sink contributed by the Synch-Dispatch 
    *   Property. The sink holds a back pointer to the property.
    *   The sink intercepts incoming calls to objects resident in 
    *   the Context and co-ordinates with the property to enforce
    *   the domain policy.
    */
    internal class SynchronizedServerContextSink 
            : InternalSink, IMessageSink
    { 
        internal IMessageSink   _nextSink; 
        [System.Security.SecurityCritical /*auto-generated*/]
        internal SynchronizationAttribute _property; 

        [System.Security.SecurityCritical]  // auto-generated
        internal SynchronizedServerContextSink(SynchronizationAttribute prop, IMessageSink nextSink)
        { 
            _property = prop;
            _nextSink = nextSink; 
        } 

        [System.Security.SecuritySafeCritical]  // auto-generated 
        ~SynchronizedServerContextSink()
        {
            _property.Dispose();
        } 

        /* 
        * Implements IMessageSink::SyncProcessMessage 
        */
        [System.Security.SecurityCritical]  // auto-generated 
        public virtual IMessage SyncProcessMessage(IMessage reqMsg)
        {
            // 1. Create a work item
            WorkItem work = new WorkItem(reqMsg, 
                                        _nextSink,
                                        null /* replySink */); 
 
            // 2. Notify the property to handle the WorkItem
            // The work item may get put in a Queue or may execute directly 
            // if the domain is free.
            _property.HandleWorkRequest(work);

            // 3. Pick up retMsg from the WorkItem and return 
            return work.ReplyMessage;
        } 
 
        /*
        *   Implements IMessageSink::AsyncProcessMessage 
        */
        [System.Security.SecurityCritical]  // auto-generated
        public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
        { 
            // 1. Create a work item
            WorkItem work = new WorkItem(reqMsg, 
                                        _nextSink, 
                                        replySink);
            work.SetAsync(); 
            // 2. We always queue the work item in async case
            _property.HandleWorkRequest(work);
            // 3. Return an IMsgCtrl
            return null; 
        }
 
        /* 
        * Implements IMessageSink::GetNextSink
        */ 
        public IMessageSink NextSink
        {
            [System.Security.SecurityCritical]  // auto-generated
            get 
            {
                return _nextSink; 
            } 
        }
    } 

    //*************************************** WORK ITEM ********************************//
    /*
    *   A work item holds the info about a call to [....] or 
    *   Async-ProcessMessage.
    */ 
    internal class WorkItem 
    {
        private const int FLG_WAITING  = 0x0001; 
        private const int FLG_SIGNALED = 0x0002;
        private const int FLG_ASYNC      = 0x0004;
        private const int FLG_DUMMY     = 0x0008;
 
        internal int _flags;
        internal IMessage _reqMsg; 
        internal IMessageSink _nextSink; 
        // ReplySink will be null for an [....] work item.
        internal IMessageSink _replySink; 
        // ReplyMsg is set once the [....] call is completed
        internal IMessage _replyMsg;

        // Context in which the work should execute. 
        internal Context _ctx;
 
        [System.Security.SecurityCritical /*auto-generated*/] 
        internal LogicalCallContext _callCtx;
        internal static InternalCrossContextDelegate _xctxDel = new InternalCrossContextDelegate(ExecuteCallback); 

        //DBGDBG
        //internal int _thread;
 
        [System.Security.SecuritySafeCritical]  // auto-generated
        static WorkItem() 
        { 
        }
 
        [System.Security.SecurityCritical]  // auto-generated
        internal WorkItem(IMessage reqMsg, IMessageSink nextSink, IMessageSink replySink)
        {
            _reqMsg = reqMsg; 
            _replyMsg = null;
            _nextSink = nextSink; 
            _replySink = replySink; 
            _ctx = Thread.CurrentContext;
            _callCtx = CallContext.GetLogicalCallContext(); 
            //DBGDBG
            //_thread = Thread.CurrentThread.GetHashCode();
        }
 
        // To mark a work item being enqueued
        internal virtual void SetWaiting() 
        { 
            Contract.Assert(!IsWaiting(),"!IsWaiting()");
            _flags |= FLG_WAITING; 
        }

        internal virtual bool IsWaiting()
        { 
            return (_flags&FLG_WAITING) == FLG_WAITING;
        } 
 
        // To mark a work item that has been given the green light!
        internal virtual void SetSignaled() 
        {
            Contract.Assert(!IsSignaled(),"!IsSignaled()");
            _flags |= FLG_SIGNALED;
        } 

        internal virtual bool IsSignaled() 
        { 
            return (_flags & FLG_SIGNALED) == FLG_SIGNALED;
        } 

        internal virtual void SetAsync()
        {
            _flags |= FLG_ASYNC; 
        }
 
        internal virtual bool IsAsync() 
        {
            return (_flags & FLG_ASYNC) == FLG_ASYNC; 
        }

        internal virtual void SetDummy()
        { 
            _flags |= FLG_DUMMY;
        } 
 
        internal virtual bool IsDummy()
        { 
            return (_flags & FLG_DUMMY) == FLG_DUMMY;
        }

 
        [System.Security.SecurityCritical]  // auto-generated
        internal static Object ExecuteCallback(Object[] args) 
        { 
            WorkItem This = (WorkItem) args[0];
 
            if (This.IsAsync())
            {
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] AsyncWork.Execute");
                This._nextSink.AsyncProcessMessage(This._reqMsg, This._replySink); 
            }
            else if (This._nextSink != null) 
            { 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] SyncWork.Execute");
                This._replyMsg = This._nextSink.SyncProcessMessage(This._reqMsg); 
            }
            return null;
        }
 
        /*
        *   Execute is called to complete a work item ([....] or async). 
        *   Execute assumes that the context is set correctly and the lock 
        *   is taken (i.e. it makes no policy decisions)
        * 
        *   It is called from the following 3 points:
        *       1. thread pool thread executing the callback for an async item
        *       2. calling thread executing the callback for a queued [....] item
        *       3. calling thread directly calling Execute for a non-queued [....] item 
        */
        [System.Security.SecurityCritical]  // auto-generated 
        internal virtual void Execute() 
        {
            // Execute should be called with the domain policy enforced 
            // i.e. a Synchronization domain should be locked etc ...
            Contract.Assert(IsSignaled(),"IsSignaled()");

            Thread.CurrentThread.InternalCrossContextCallback(_ctx, _xctxDel, new Object[] { this } ); 
        }
        internal virtual IMessage ReplyMessage { get {return _replyMsg;}} 
    } 

    //*************************************** CLIENT SINK ********************************// 

    /*
    *   Implements the client context sink contributed by the
    *   Property. The sink holds a back pointer to the property. 
    *   The sink intercepts outgoing calls from objects the Context
    *   and co-ordinates with the property to enforce the domain policy. 
    */ 
    internal class SynchronizedClientContextSink
            : InternalSink, IMessageSink 
    {
        internal IMessageSink   _nextSink;
        [System.Security.SecurityCritical /*auto-generated*/]
        internal SynchronizationAttribute _property; 

        [System.Security.SecurityCritical]  // auto-generated 
        internal SynchronizedClientContextSink(SynchronizationAttribute prop, IMessageSink nextSink) 
        {
            _property = prop; 
            _nextSink = nextSink;
        }

        [System.Security.SecuritySafeCritical]  // auto-generated 
        ~SynchronizedClientContextSink()
        { 
            _property.Dispose(); 
        }
 
        /*
        *   Implements IMessageSink::SyncProcessMessage for the call-out sink
        */
        [System.Security.SecurityCritical]  // auto-generated 
        public virtual IMessage SyncProcessMessage(IMessage reqMsg)
        { 
            Contract.Assert(_property.Locked == true,"_property.Locked == true"); 
            IMessage replyMsg;
            if (_property.IsReEntrant) 
            {
                // In this case we are required to let anybody waiting for
                // the domain to enter and execute
                // Notify the property that we are leaving 
                _property.HandleThreadExit();
 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: [....] call-out"); 
                replyMsg = _nextSink.SyncProcessMessage(reqMsg);
 
                // We will just block till we are given permission to re-enter
                // Notify the property that we wish to re-enter the domain.
                // This will block the thread here if someone is in the domain.
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: [....] call-out returned, waiting for lock"); 
                _property.HandleThreadReEntry();
                Contract.Assert(_property.Locked == true,"_property.Locked == true"); 
            } 
            else
            { 
                // In the non-reentrant case we are just a pass-through sink
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: [....] call-out (pass through)");
                // We should mark the domain with our LCID so that call-backs are allowed to enter..
                LogicalCallContext cctx = 
                    (LogicalCallContext) reqMsg.Properties[Message.CallContextKey];
 
                String lcid = cctx.RemotingData.LogicalCallID; 
                bool bClear = false;
                if (lcid == null) 
                {
                    // We used to assign call-ids in RemotingProxy.cs at the
                    // start of each Invoke. As an optimization we now do it
                    // here in a delayed fashion... since currently only 
                    // Synchronization needs it
                    // Note that for [....]-calls we would just inherit an LCID 
                    // if the call has one, if not we create one. However for 
                    // async calls we always generate a new LCID.
                    lcid = Identity.GetNewLogicalCallID(); 
                    cctx.RemotingData.LogicalCallID = lcid;
                    bClear = true;

                    Contract.Assert( 
                        _property.SyncCallOutLCID == null,
                        "Synchronization domain is already in a callOut state"); 
                } 

                bool bTopLevel=false; 
                if (_property.SyncCallOutLCID==null)
                {
                    _property.SyncCallOutLCID = lcid;
                    bTopLevel = true; 
                }
 
                Contract.Assert(lcid.Equals(_property.SyncCallOutLCID), "Bad synchronization domain state!"); 

                replyMsg = _nextSink.SyncProcessMessage(reqMsg); 

                // if a top level call out returned we clear the callId in the domain
                if (bTopLevel)
                { 
                    _property.SyncCallOutLCID = null;
 
                    // The [....] callOut is done, we do not need the lcid 
                    // that was associated with the call any more.
                    // (clear it only if we added one to the reqMsg) 
                    if (bClear)
                    {
                        // Note that we make changes to the callCtx in
                        // the reply message ... since this is the one that 
                        // will get installed back on the thread that called
                        // the proxy. 
                        LogicalCallContext cctxRet = 
                            (LogicalCallContext) replyMsg.Properties[Message.CallContextKey];
                        Contract.Assert( 
                            cctxRet != null,
                            "CallContext should be non-null");
                        cctxRet.RemotingData.LogicalCallID = null;
                    } 
                }
 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: [....] call-out returned"); 
            }
            return replyMsg; 
        }

        /*
        *   Implements IMessageSink::AsyncProcessMessage for the call-out sink 
        */
        [System.Security.SecurityCritical]  // auto-generated 
        public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink) 
        {
            IMessageCtrl msgCtrl = null; 

            Contract.Assert(_property.Locked == true,"_property.Locked == true");

            if (!_property.IsReEntrant) 
            {
                // In this case new calls are not allowed to enter the domain 
                // We need to track potentially more than one async-call-outs 
                // and allow the completion notifications to come in for those
 
                LogicalCallContext cctx =
                    (LogicalCallContext) reqMsg.Properties[Message.CallContextKey];
                // We used to generate a new lcid automatically in RemotingProxy
                // Invoke at the start of each Async call. 
                // However now we do it here as an optimization (since only
                // Synchronization needs it) 
                // RemotingProxy invoke code does Clone() the callContext at 
                // the start of each Async call so we don't have to worry
                // about stomping someone else's lcid here. 


                String lcid =  Identity.GetNewLogicalCallID();
                cctx.RemotingData.LogicalCallID = lcid; 

 
                Contract.Assert( 
                    _property.SyncCallOutLCID == null,
                    "Cannot handle async call outs when already in a top-level [....] call out"); 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Async CallOut: adding to lcidList: " + lcid);
                _property.AsyncCallOutLCIDList.Add(lcid);
            }
            // We will call AsyncProcessMessage directly on this thread 
            // since the thread should not block much. However, we will
            // have to intercept the callback on the replySink chain for 
            // which we wrap the caller provided replySink into our sink. 
            AsyncReplySink mySink = new AsyncReplySink(replySink, _property);
 
            // NOTE: we will need to yield the Synchronization Domain at
            // some time or another to get our own callBack to complete.

            // Note that for the Async call-outs we have to provide an interception 
            // sink whether we are re-entrant or not since we want
            // the replySink.SyncProcessMessage call to be wait for the lock just like 
            // any other call-in. 
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out");
 
            msgCtrl = _nextSink.AsyncProcessMessage(reqMsg, (IMessageSink)mySink);
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out AsyncPM returned, reply to come separately");

            return msgCtrl; 
        }
 
        /* 
        *   Implements IMessageSink::GetNextSink
        */ 
        public IMessageSink NextSink
        {
            [System.Security.SecurityCritical]  // auto-generated
            get 
            {
                return _nextSink; 
            } 

        } 

        /*
        *   This class just implements the CallBack sink we provide to
        *   intercept the callback of an Async out-call. The CallBack sink 
        *   ensures that arbitrary threads do not enter our Synchronization
        *   Domain without asking us if it is Ok! 
        */ 
        internal class AsyncReplySink : IMessageSink
        { 
            internal IMessageSink _nextSink;
            [System.Security.SecurityCritical /*auto-generated*/]
            internal SynchronizationAttribute _property;
            [System.Security.SecurityCritical]  // auto-generated 
            internal AsyncReplySink(IMessageSink nextSink, SynchronizationAttribute prop)
            { 
                _nextSink = nextSink; 
                _property = prop;
            } 

            [System.Security.SecurityCritical]  // auto-generated
            public virtual IMessage SyncProcessMessage(IMessage reqMsg)
            { 

                // We handle this as a regular new [....] workItem 
                // 1. Create a work item 
                WorkItem work = new WorkItem(reqMsg,
                                            _nextSink, 
                                            null /* replySink */);

                // 2. Notify the property to handle the WorkItem
                // The work item may get put in a Queue or may execute right away. 
                _property.HandleWorkRequest(work);
 
                if (!_property.IsReEntrant) 
                {
                    // Remove the async lcid we had added to the call out list. 
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: InterceptionSink::SyncPM Removing async call-out lcid: " + ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID);
                    _property.AsyncCallOutLCIDList.Remove(
                        ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID);
                } 

                // 3. Pick up retMsg from the WorkItem and return 
                return work.ReplyMessage; 
            }
 
            [System.Security.SecurityCritical]  // auto-generated
            public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
            {
                throw new NotSupportedException(); 
            }
 
            /* 
            * Implements IMessageSink::GetNextSink
            */ 
            public IMessageSink NextSink
            {
                [System.Security.SecurityCritical]  // auto-generated
                get 
                {
                    return _nextSink; 
                } 
            }
        } 
    }

}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK