InputQueue.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / System.Runtime.DurableInstancing / System / Runtime / InputQueue.cs / 1305376 / InputQueue.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------

namespace System.Runtime 
{
    using System; 
    using System.Collections.Generic; 
    using System.Threading;
 
    [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
    sealed class InputQueue : IDisposable where T : class
    {
        static Action completeOutstandingReadersCallback; 
        static Action completeWaitersFalseCallback;
        static Action completeWaitersTrueCallback; 
        static Action onDispatchCallback; 
        static Action onInvokeDequeuedCallback;
 
        QueueState queueState;

        [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
        ItemQueue itemQueue; 

        [Fx.Tag.SynchronizationObject] 
        Queue readerQueue; 

        [Fx.Tag.SynchronizationObject] 
        List waiterList;

        public InputQueue()
        { 
            this.itemQueue = new ItemQueue();
            this.readerQueue = new Queue(); 
            this.waiterList = new List(); 
            this.queueState = QueueState.Open;
        } 

        public InputQueue(Func> asyncCallbackGenerator)
            : this()
        { 
            Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
            AsyncCallbackGenerator = asyncCallbackGenerator; 
        } 

        public int PendingCount 
        {
            get
            {
                lock (ThisLock) 
                {
                    return this.itemQueue.ItemCount; 
                } 
            }
        } 

        // Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
        public Action DisposeItemCallback
        { 
            get;
            set; 
        } 

        // Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc 
        Func> AsyncCallbackGenerator
        {
            get;
            set; 
        }
 
        object ThisLock 
        {
            get { return this.itemQueue; } 
        }

        public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            Item item = default(Item);
 
            lock (ThisLock) 
            {
                if (queueState == QueueState.Open) 
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem(); 
                    }
                    else 
                    { 
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader); 
                        return reader;
                    }
                }
                else if (queueState == QueueState.Shutdown) 
                {
                    if (itemQueue.HasAvailableItem) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                    } 
                    else if (itemQueue.HasAnyItem)
                    {
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader); 
                        return reader;
                    } 
                } 
            }
 
            InvokeDequeuedCallback(item.DequeuedCallback);
            return new CompletedAsyncResult(item.GetValue(), callback, state);
        }
 
        public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            lock (ThisLock) 
            {
                if (queueState == QueueState.Open) 
                {
                    if (!itemQueue.HasAvailableItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); 
                        waiterList.Add(waiter);
                        return waiter; 
                    } 
                }
                else if (queueState == QueueState.Shutdown) 
                {
                    if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); 
                        waiterList.Add(waiter);
                        return waiter; 
                    } 
                }
            } 

            return new CompletedAsyncResult(true, callback, state);
        }
 
        public void Close()
        { 
            Dispose(); 
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public T Dequeue(TimeSpan timeout)
        {
            T value; 

            if (!this.Dequeue(timeout, out value)) 
            { 
                throw Fx.Exception.AsError(new TimeoutException(SRCore.TimeoutInputQueueDequeue(timeout)));
            } 

            return value;
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public bool Dequeue(TimeSpan timeout, out T value) 
        { 
            WaitQueueReader reader = null;
            Item item = new Item(); 

            lock (ThisLock)
            {
                if (queueState == QueueState.Open) 
                {
                    if (itemQueue.HasAvailableItem) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                    } 
                    else
                    {
                        reader = new WaitQueueReader(this);
                        readerQueue.Enqueue(reader); 
                    }
                } 
                else if (queueState == QueueState.Shutdown) 
                {
                    if (itemQueue.HasAvailableItem) 
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else if (itemQueue.HasAnyItem) 
                    {
                        reader = new WaitQueueReader(this); 
                        readerQueue.Enqueue(reader); 
                    }
                    else 
                    {
                        value = default(T);
                        return true;
                    } 
                }
                else // queueState == QueueState.Closed 
                { 
                    value = default(T);
                    return true; 
                }
            }

            if (reader != null) 
            {
                return reader.Wait(timeout, out value); 
            } 
            else
            { 
                InvokeDequeuedCallback(item.DequeuedCallback);
                value = item.GetValue();
                return true;
            } 
        }
 
        public void Dispatch() 
        {
            IQueueReader reader = null; 
            Item item = new Item();
            IQueueReader[] outstandingReaders = null;
            IQueueWaiter[] waiters = null;
            bool itemAvailable = true; 

            lock (ThisLock) 
            { 
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters); 

                if (queueState != QueueState.Closed)
                {
                    itemQueue.MakePendingItemAvailable(); 

                    if (readerQueue.Count > 0) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                        reader = readerQueue.Dequeue(); 

                        if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
                        {
                            outstandingReaders = new IQueueReader[readerQueue.Count]; 
                            readerQueue.CopyTo(outstandingReaders, 0);
                            readerQueue.Clear(); 
 
                            itemAvailable = false;
                        } 
                    }
                }
            }
 
            if (outstandingReaders != null)
            { 
                if (completeOutstandingReadersCallback == null) 
                {
                    completeOutstandingReadersCallback = new Action(CompleteOutstandingReadersCallback); 
                }

                ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
            } 

            if (waiters != null) 
            { 
                CompleteWaitersLater(itemAvailable, waiters);
            } 

            if (reader != null)
            {
                InvokeDequeuedCallback(item.DequeuedCallback); 
                reader.Set(item);
            } 
        } 

        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")] 
        public bool EndDequeue(IAsyncResult result, out T value)
        {
            CompletedAsyncResult typedResult = result as CompletedAsyncResult;
 
            if (typedResult != null)
            { 
                value = CompletedAsyncResult.End(result); 
                return true;
            } 

            return AsyncQueueReader.End(result, out value);
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
        public T EndDequeue(IAsyncResult result) 
        { 
            T value;
 
            if (!this.EndDequeue(result, out value))
            {
                throw Fx.Exception.AsError(new TimeoutException());
            } 

            return value; 
        } 

        [Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")] 
        public bool EndWaitForItem(IAsyncResult result)
        {
            CompletedAsyncResult typedResult = result as CompletedAsyncResult;
            if (typedResult != null) 
            {
                return CompletedAsyncResult.End(result); 
            } 

            return AsyncQueueWaiter.End(result); 
        }

        public void EnqueueAndDispatch(T item)
        { 
            EnqueueAndDispatch(item, null);
        } 
 
        // dequeuedCallback is called as an item is dequeued from the InputQueue.  The
        // InputQueue lock is not held during the callback.  However, the user code will 
        // not be notified of the item being available until the callback returns.  If you
        // are not sure if the callback will block for a long time, then first call
        // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
        public void EnqueueAndDispatch(T item, Action dequeuedCallback) 
        {
            EnqueueAndDispatch(item, dequeuedCallback, true); 
        } 

        public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 
        {
            Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
            EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
        } 

        public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread) 
        { 
            Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
            EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread); 
        }

        public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
        { 
            Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
            return EnqueueWithoutDispatch(new Item(item, dequeuedCallback)); 
        } 

        public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback) 
        {
            Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
            return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
        } 

 
        public void Shutdown() 
        {
            this.Shutdown(null); 
        }

        // Don't let any more items in. Differs from Close in that we keep around
        // existing items in our itemQueue for possible future calls to Dequeue 
        public void Shutdown(Func pendingExceptionGenerator)
        { 
            IQueueReader[] outstandingReaders = null; 
            lock (ThisLock)
            { 
                if (queueState == QueueState.Shutdown)
                {
                    return;
                } 

                if (queueState == QueueState.Closed) 
                { 
                    return;
                } 

                this.queueState = QueueState.Shutdown;

                if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0) 
                {
                    outstandingReaders = new IQueueReader[readerQueue.Count]; 
                    readerQueue.CopyTo(outstandingReaders, 0); 
                    readerQueue.Clear();
                } 
            }

            if (outstandingReaders != null)
            { 
                for (int i = 0; i < outstandingReaders.Length; i++)
                { 
                    Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null; 
                    outstandingReaders[i].Set(new Item(exception, null));
                } 
            }
        }

        [Fx.Tag.Blocking(CancelMethod = "Dispatch")] 
        public bool WaitForItem(TimeSpan timeout)
        { 
            WaitQueueWaiter waiter = null; 
            bool itemAvailable = false;
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                { 
                    if (itemQueue.HasAvailableItem)
                    { 
                        itemAvailable = true; 
                    }
                    else 
                    {
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter);
                    } 
                }
                else if (queueState == QueueState.Shutdown) 
                { 
                    if (itemQueue.HasAvailableItem)
                    { 
                        itemAvailable = true;
                    }
                    else if (itemQueue.HasAnyItem)
                    { 
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter); 
                    } 
                    else
                    { 
                        return true;
                    }
                }
                else // queueState == QueueState.Closed 
                {
                    return true; 
                } 
            }
 
            if (waiter != null)
            {
                return waiter.Wait(timeout);
            } 
            else
            { 
                return itemAvailable; 
            }
        } 

        public void Dispose()
        {
            bool dispose = false; 

            lock (ThisLock) 
            { 
                if (queueState != QueueState.Closed)
                { 
                    queueState = QueueState.Closed;
                    dispose = true;
                }
            } 

            if (dispose) 
            { 
                while (readerQueue.Count > 0)
                { 
                    IQueueReader reader = readerQueue.Dequeue();
                    reader.Set(default(Item));
                }
 
                while (itemQueue.HasAnyItem)
                { 
                    Item item = itemQueue.DequeueAnyItem(); 
                    DisposeItem(item);
                    InvokeDequeuedCallback(item.DequeuedCallback); 
                }
            }
        }
 
        void DisposeItem(Item item)
        { 
            T value = item.Value; 
            if (value != null)
            { 
                if (value is IDisposable)
                {
                    ((IDisposable)value).Dispose();
                } 
                else
                { 
                    Action disposeItemCallback = this.DisposeItemCallback; 
                    if (disposeItemCallback != null)
                    { 
                        disposeItemCallback(value);
                    }
                }
            } 
        }
 
        static void CompleteOutstandingReadersCallback(object state) 
        {
            IQueueReader[] outstandingReaders = (IQueueReader[])state; 

            for (int i = 0; i < outstandingReaders.Length; i++)
            {
                outstandingReaders[i].Set(default(Item)); 
            }
        } 
 
        static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
        { 
            for (int i = 0; i < waiters.Length; i++)
            {
                waiters[i].Set(itemAvailable);
            } 
        }
 
        static void CompleteWaitersFalseCallback(object state) 
        {
            CompleteWaiters(false, (IQueueWaiter[])state); 
        }

        static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
        { 
            if (itemAvailable)
            { 
                if (completeWaitersTrueCallback == null) 
                {
                    completeWaitersTrueCallback = new Action(CompleteWaitersTrueCallback); 
                }

                ActionItem.Schedule(completeWaitersTrueCallback, waiters);
            } 
            else
            { 
                if (completeWaitersFalseCallback == null) 
                {
                    completeWaitersFalseCallback = new Action(CompleteWaitersFalseCallback); 
                }

                ActionItem.Schedule(completeWaitersFalseCallback, waiters);
            } 
        }
 
        static void CompleteWaitersTrueCallback(object state) 
        {
            CompleteWaiters(true, (IQueueWaiter[])state); 
        }

        static void InvokeDequeuedCallback(Action dequeuedCallback)
        { 
            if (dequeuedCallback != null)
            { 
                dequeuedCallback(); 
            }
        } 

        static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
        {
            if (dequeuedCallback != null) 
            {
                if (onInvokeDequeuedCallback == null) 
                { 
                    onInvokeDequeuedCallback = new Action(OnInvokeDequeuedCallback);
                } 

                ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
            }
        } 

        static void OnDispatchCallback(object state) 
        { 
            ((InputQueue)state).Dispatch();
        } 

        static void OnInvokeDequeuedCallback(object state)
        {
            Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)"); 

            Action dequeuedCallback = (Action)state; 
            dequeuedCallback(); 
        }
 
        void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
        {
            bool disposeItem = false;
            IQueueReader reader = null; 
            bool dispatchLater = false;
            IQueueWaiter[] waiters = null; 
            bool itemAvailable = true; 

            lock (ThisLock) 
            {
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters);
 
                if (queueState == QueueState.Open)
                { 
                    if (canDispatchOnThisThread) 
                    {
                        if (readerQueue.Count == 0) 
                        {
                            itemQueue.EnqueueAvailableItem(item);
                        }
                        else 
                        {
                            reader = readerQueue.Dequeue(); 
                        } 
                    }
                    else 
                    {
                        if (readerQueue.Count == 0)
                        {
                            itemQueue.EnqueueAvailableItem(item); 
                        }
                        else 
                        { 
                            itemQueue.EnqueuePendingItem(item);
                            dispatchLater = true; 
                        }
                    }
                }
                else // queueState == QueueState.Closed || queueState == QueueState.Shutdown 
                {
                    disposeItem = true; 
                } 
            }
 
            if (waiters != null)
            {
                if (canDispatchOnThisThread)
                { 
                    CompleteWaiters(itemAvailable, waiters);
                } 
                else 
                {
                    CompleteWaitersLater(itemAvailable, waiters); 
                }
            }

            if (reader != null) 
            {
                InvokeDequeuedCallback(item.DequeuedCallback); 
                reader.Set(item); 
            }
 
            if (dispatchLater)
            {
                if (onDispatchCallback == null)
                { 
                    onDispatchCallback = new Action(OnDispatchCallback);
                } 
 
                ActionItem.Schedule(onDispatchCallback, this);
            } 
            else if (disposeItem)
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                DisposeItem(item); 
            }
        } 
 
        // This will not block, however, Dispatch() must be called later if this function
        // returns true. 
        bool EnqueueWithoutDispatch(Item item)
        {
            lock (ThisLock)
            { 
                // Open
                if (queueState != QueueState.Closed && queueState != QueueState.Shutdown) 
                { 
                    if (readerQueue.Count == 0 && waiterList.Count == 0)
                    { 
                        itemQueue.EnqueueAvailableItem(item);
                        return false;
                    }
                    else 
                    {
                        itemQueue.EnqueuePendingItem(item); 
                        return true; 
                    }
                } 
            }

            DisposeItem(item);
            InvokeDequeuedCallbackLater(item.DequeuedCallback); 
            return false;
        } 
 
        void GetWaiters(out IQueueWaiter[] waiters)
        { 
            if (waiterList.Count > 0)
            {
                waiters = waiterList.ToArray();
                waiterList.Clear(); 
            }
            else 
            { 
                waiters = null;
            } 
        }

        // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
        // dispatching items to timed out readers. 
        bool RemoveReader(IQueueReader reader)
        { 
            Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)"); 

            lock (ThisLock) 
            {
                if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
                {
                    bool removed = false; 

                    for (int i = readerQueue.Count; i > 0; i--) 
                    { 
                        IQueueReader temp = readerQueue.Dequeue();
                        if (object.ReferenceEquals(temp, reader)) 
                        {
                            removed = true;
                        }
                        else 
                        {
                            readerQueue.Enqueue(temp); 
                        } 
                    }
 
                    return removed;
                }
            }
 
            return false;
        } 
 
        enum QueueState
        { 
            Open,
            Shutdown,
            Closed
        } 

        interface IQueueReader 
        { 
            void Set(Item item);
        } 

        interface IQueueWaiter
        {
            void Set(bool itemAvailable); 
        }
 
        struct Item 
        {
            Action dequeuedCallback; 
            Exception exception;
            T value;

            public Item(T value, Action dequeuedCallback) 
                : this(value, null, dequeuedCallback)
            { 
            } 

            public Item(Exception exception, Action dequeuedCallback) 
                : this(null, exception, dequeuedCallback)
            {
            }
 
            Item(T value, Exception exception, Action dequeuedCallback)
            { 
                this.value = value; 
                this.exception = exception;
                this.dequeuedCallback = dequeuedCallback; 
            }

            public Action DequeuedCallback
            { 
                get { return this.dequeuedCallback; }
            } 
 
            public Exception Exception
            { 
                get { return this.exception; }
            }

            public T Value 
            {
                get { return this.value; } 
            } 

            public T GetValue() 
            {
                if (this.exception != null)
                {
                    throw Fx.Exception.AsError(this.exception); 
                }
 
                return this.value; 
            }
        } 

        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueReader : AsyncResult, IQueueReader
        { 
            static Action timerCallback = new Action(AsyncQueueReader.TimerCallback);
 
            bool expired; 
            InputQueue inputQueue;
            T item; 
            IOThreadTimer timer;

            public AsyncQueueReader(InputQueue inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            {
                if (inputQueue.AsyncCallbackGenerator != null) 
                { 
                    base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
                } 
                this.inputQueue = inputQueue;
                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(timerCallback, this, false); 
                    this.timer.Set(timeout);
                } 
            } 

            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")] 
            public static bool End(IAsyncResult result, out T value)
            {
                AsyncQueueReader readerResult = AsyncResult.End(result);
 
                if (readerResult.expired)
                { 
                    value = default(T); 
                    return false;
                } 
                else
                {
                    value = readerResult.item;
                    return true; 
                }
            } 
 
            public void Set(Item item)
            { 
                this.item = item.Value;
                if (this.timer != null)
                {
                    this.timer.Cancel(); 
                }
                Complete(false, item.Exception); 
            } 

            static void TimerCallback(object state) 
            {
                AsyncQueueReader thisPtr = (AsyncQueueReader)state;
                if (thisPtr.inputQueue.RemoveReader(thisPtr))
                { 
                    thisPtr.expired = true;
                    thisPtr.Complete(false); 
                } 
            }
        } 

        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueWaiter : AsyncResult, IQueueWaiter
        { 
            static Action timerCallback = new Action(AsyncQueueWaiter.TimerCallback);
            bool itemAvailable; 
 
            [Fx.Tag.SynchronizationObject(Blocking = false)]
            object thisLock = new object(); 

            IOThreadTimer timer;

            public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) 
            {
                if (timeout != TimeSpan.MaxValue) 
                { 
                    this.timer = new IOThreadTimer(timerCallback, this, false);
                    this.timer.Set(timeout); 
                }
            }

            object ThisLock 
            {
                get 
                { 
                    return this.thisLock;
                } 
            }

            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
            public static bool End(IAsyncResult result) 
            {
                AsyncQueueWaiter waiterResult = AsyncResult.End(result); 
                return waiterResult.itemAvailable; 
            }
 
            public void Set(bool itemAvailable)
            {
                bool timely;
 
                lock (ThisLock)
                { 
                    timely = (this.timer == null) || this.timer.Cancel(); 
                    this.itemAvailable = itemAvailable;
                } 

                if (timely)
                {
                    Complete(false); 
                }
            } 
 
            static void TimerCallback(object state)
            { 
                AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
                thisPtr.Complete(false);
            }
        } 

        class ItemQueue 
        { 
            int head;
            Item[] items; 
            int pendingCount;
            int totalCount;

            public ItemQueue() 
            {
                this.items = new Item[1]; 
            } 

            public bool HasAnyItem 
            {
                get { return this.totalCount > 0; }
            }
 
            public bool HasAvailableItem
            { 
                get { return this.totalCount > this.pendingCount; } 
            }
 
            public int ItemCount
            {
                get { return this.totalCount; }
            } 

            public Item DequeueAnyItem() 
            { 
                if (this.pendingCount == this.totalCount)
                { 
                    this.pendingCount--;
                }
                return DequeueItemCore();
            } 

            public Item DequeueAvailableItem() 
            { 
                Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
                return DequeueItemCore(); 
            }

            public void EnqueueAvailableItem(Item item)
            { 
                EnqueueItemCore(item);
            } 
 
            public void EnqueuePendingItem(Item item)
            { 
                EnqueueItemCore(item);
                this.pendingCount++;
            }
 
            public void MakePendingItemAvailable()
            { 
                Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items"); 
                this.pendingCount--;
            } 

            Item DequeueItemCore()
            {
                Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items"); 
                Item item = this.items[this.head];
                this.items[this.head] = new Item(); 
                this.totalCount--; 
                this.head = (this.head + 1) % this.items.Length;
                return item; 
            }

            void EnqueueItemCore(Item item)
            { 
                if (this.totalCount == this.items.Length)
                { 
                    Item[] newItems = new Item[this.items.Length * 2]; 
                    for (int i = 0; i < this.totalCount; i++)
                    { 
                        newItems[i] = this.items[(head + i) % this.items.Length];
                    }
                    this.head = 0;
                    this.items = newItems; 
                }
                int tail = (this.head + this.totalCount) % this.items.Length; 
                this.items[tail] = item; 
                this.totalCount++;
            } 
        }

        [Fx.Tag.SynchronizationObject(Blocking = false)]
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")] 
        class WaitQueueReader : IQueueReader
        { 
            Exception exception; 
            InputQueue inputQueue;
            T item; 

            [Fx.Tag.SynchronizationObject]
            ManualResetEvent waitEvent;
 
            public WaitQueueReader(InputQueue inputQueue)
            { 
                this.inputQueue = inputQueue; 
                waitEvent = new ManualResetEvent(false);
            } 

            public void Set(Item item)
            {
                lock (this) 
                {
                    Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)"); 
                    Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)"); 

                    this.exception = item.Exception; 
                    this.item = item.Value;
                    waitEvent.Set();
                }
            } 

            [Fx.Tag.Blocking(CancelMethod = "Set")] 
            public bool Wait(TimeSpan timeout, out T value) 
            {
                bool isSafeToClose = false; 
                try
                {
                    if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                    { 
                        if (this.inputQueue.RemoveReader(this))
                        { 
                            value = default(T); 
                            isSafeToClose = true;
                            return false; 
                        }
                        else
                        {
                            waitEvent.WaitOne(); 
                        }
                    } 
 
                    isSafeToClose = true;
                } 
                finally
                {
                    if (isSafeToClose)
                    { 
                        waitEvent.Close();
                    } 
                } 

                if (this.exception != null) 
                {
                    throw Fx.Exception.AsError(this.exception);
                }
 
                value = item;
                return true; 
            } 
        }
 
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
        class WaitQueueWaiter : IQueueWaiter
        {
            bool itemAvailable; 

            [Fx.Tag.SynchronizationObject] 
            ManualResetEvent waitEvent; 

            public WaitQueueWaiter() 
            {
                waitEvent = new ManualResetEvent(false);
            }
 
            public void Set(bool itemAvailable)
            { 
                lock (this) 
                {
                    this.itemAvailable = itemAvailable; 
                    waitEvent.Set();
                }
            }
 
            [Fx.Tag.Blocking(CancelMethod = "Set")]
            public bool Wait(TimeSpan timeout) 
            { 
                if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                { 
                    return false;
                }

                return this.itemAvailable; 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
//------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------

namespace System.Runtime 
{
    using System; 
    using System.Collections.Generic; 
    using System.Threading;
 
    [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
    sealed class InputQueue : IDisposable where T : class
    {
        static Action completeOutstandingReadersCallback; 
        static Action completeWaitersFalseCallback;
        static Action completeWaitersTrueCallback; 
        static Action onDispatchCallback; 
        static Action onInvokeDequeuedCallback;
 
        QueueState queueState;

        [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
        ItemQueue itemQueue; 

        [Fx.Tag.SynchronizationObject] 
        Queue readerQueue; 

        [Fx.Tag.SynchronizationObject] 
        List waiterList;

        public InputQueue()
        { 
            this.itemQueue = new ItemQueue();
            this.readerQueue = new Queue(); 
            this.waiterList = new List(); 
            this.queueState = QueueState.Open;
        } 

        public InputQueue(Func> asyncCallbackGenerator)
            : this()
        { 
            Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
            AsyncCallbackGenerator = asyncCallbackGenerator; 
        } 

        public int PendingCount 
        {
            get
            {
                lock (ThisLock) 
                {
                    return this.itemQueue.ItemCount; 
                } 
            }
        } 

        // Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
        public Action DisposeItemCallback
        { 
            get;
            set; 
        } 

        // Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc 
        Func> AsyncCallbackGenerator
        {
            get;
            set; 
        }
 
        object ThisLock 
        {
            get { return this.itemQueue; } 
        }

        public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            Item item = default(Item);
 
            lock (ThisLock) 
            {
                if (queueState == QueueState.Open) 
                {
                    if (itemQueue.HasAvailableItem)
                    {
                        item = itemQueue.DequeueAvailableItem(); 
                    }
                    else 
                    { 
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader); 
                        return reader;
                    }
                }
                else if (queueState == QueueState.Shutdown) 
                {
                    if (itemQueue.HasAvailableItem) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                    } 
                    else if (itemQueue.HasAnyItem)
                    {
                        AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
                        readerQueue.Enqueue(reader); 
                        return reader;
                    } 
                } 
            }
 
            InvokeDequeuedCallback(item.DequeuedCallback);
            return new CompletedAsyncResult(item.GetValue(), callback, state);
        }
 
        public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            lock (ThisLock) 
            {
                if (queueState == QueueState.Open) 
                {
                    if (!itemQueue.HasAvailableItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); 
                        waiterList.Add(waiter);
                        return waiter; 
                    } 
                }
                else if (queueState == QueueState.Shutdown) 
                {
                    if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
                    {
                        AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); 
                        waiterList.Add(waiter);
                        return waiter; 
                    } 
                }
            } 

            return new CompletedAsyncResult(true, callback, state);
        }
 
        public void Close()
        { 
            Dispose(); 
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public T Dequeue(TimeSpan timeout)
        {
            T value; 

            if (!this.Dequeue(timeout, out value)) 
            { 
                throw Fx.Exception.AsError(new TimeoutException(SRCore.TimeoutInputQueueDequeue(timeout)));
            } 

            return value;
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close")]
        public bool Dequeue(TimeSpan timeout, out T value) 
        { 
            WaitQueueReader reader = null;
            Item item = new Item(); 

            lock (ThisLock)
            {
                if (queueState == QueueState.Open) 
                {
                    if (itemQueue.HasAvailableItem) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                    } 
                    else
                    {
                        reader = new WaitQueueReader(this);
                        readerQueue.Enqueue(reader); 
                    }
                } 
                else if (queueState == QueueState.Shutdown) 
                {
                    if (itemQueue.HasAvailableItem) 
                    {
                        item = itemQueue.DequeueAvailableItem();
                    }
                    else if (itemQueue.HasAnyItem) 
                    {
                        reader = new WaitQueueReader(this); 
                        readerQueue.Enqueue(reader); 
                    }
                    else 
                    {
                        value = default(T);
                        return true;
                    } 
                }
                else // queueState == QueueState.Closed 
                { 
                    value = default(T);
                    return true; 
                }
            }

            if (reader != null) 
            {
                return reader.Wait(timeout, out value); 
            } 
            else
            { 
                InvokeDequeuedCallback(item.DequeuedCallback);
                value = item.GetValue();
                return true;
            } 
        }
 
        public void Dispatch() 
        {
            IQueueReader reader = null; 
            Item item = new Item();
            IQueueReader[] outstandingReaders = null;
            IQueueWaiter[] waiters = null;
            bool itemAvailable = true; 

            lock (ThisLock) 
            { 
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters); 

                if (queueState != QueueState.Closed)
                {
                    itemQueue.MakePendingItemAvailable(); 

                    if (readerQueue.Count > 0) 
                    { 
                        item = itemQueue.DequeueAvailableItem();
                        reader = readerQueue.Dequeue(); 

                        if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
                        {
                            outstandingReaders = new IQueueReader[readerQueue.Count]; 
                            readerQueue.CopyTo(outstandingReaders, 0);
                            readerQueue.Clear(); 
 
                            itemAvailable = false;
                        } 
                    }
                }
            }
 
            if (outstandingReaders != null)
            { 
                if (completeOutstandingReadersCallback == null) 
                {
                    completeOutstandingReadersCallback = new Action(CompleteOutstandingReadersCallback); 
                }

                ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
            } 

            if (waiters != null) 
            { 
                CompleteWaitersLater(itemAvailable, waiters);
            } 

            if (reader != null)
            {
                InvokeDequeuedCallback(item.DequeuedCallback); 
                reader.Set(item);
            } 
        } 

        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")] 
        public bool EndDequeue(IAsyncResult result, out T value)
        {
            CompletedAsyncResult typedResult = result as CompletedAsyncResult;
 
            if (typedResult != null)
            { 
                value = CompletedAsyncResult.End(result); 
                return true;
            } 

            return AsyncQueueReader.End(result, out value);
        }
 
        [Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
        public T EndDequeue(IAsyncResult result) 
        { 
            T value;
 
            if (!this.EndDequeue(result, out value))
            {
                throw Fx.Exception.AsError(new TimeoutException());
            } 

            return value; 
        } 

        [Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")] 
        public bool EndWaitForItem(IAsyncResult result)
        {
            CompletedAsyncResult typedResult = result as CompletedAsyncResult;
            if (typedResult != null) 
            {
                return CompletedAsyncResult.End(result); 
            } 

            return AsyncQueueWaiter.End(result); 
        }

        public void EnqueueAndDispatch(T item)
        { 
            EnqueueAndDispatch(item, null);
        } 
 
        // dequeuedCallback is called as an item is dequeued from the InputQueue.  The
        // InputQueue lock is not held during the callback.  However, the user code will 
        // not be notified of the item being available until the callback returns.  If you
        // are not sure if the callback will block for a long time, then first call
        // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
        public void EnqueueAndDispatch(T item, Action dequeuedCallback) 
        {
            EnqueueAndDispatch(item, dequeuedCallback, true); 
        } 

        public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread) 
        {
            Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
            EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
        } 

        public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread) 
        { 
            Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
            EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread); 
        }

        public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
        { 
            Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
            return EnqueueWithoutDispatch(new Item(item, dequeuedCallback)); 
        } 

        public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback) 
        {
            Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
            return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
        } 

 
        public void Shutdown() 
        {
            this.Shutdown(null); 
        }

        // Don't let any more items in. Differs from Close in that we keep around
        // existing items in our itemQueue for possible future calls to Dequeue 
        public void Shutdown(Func pendingExceptionGenerator)
        { 
            IQueueReader[] outstandingReaders = null; 
            lock (ThisLock)
            { 
                if (queueState == QueueState.Shutdown)
                {
                    return;
                } 

                if (queueState == QueueState.Closed) 
                { 
                    return;
                } 

                this.queueState = QueueState.Shutdown;

                if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0) 
                {
                    outstandingReaders = new IQueueReader[readerQueue.Count]; 
                    readerQueue.CopyTo(outstandingReaders, 0); 
                    readerQueue.Clear();
                } 
            }

            if (outstandingReaders != null)
            { 
                for (int i = 0; i < outstandingReaders.Length; i++)
                { 
                    Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null; 
                    outstandingReaders[i].Set(new Item(exception, null));
                } 
            }
        }

        [Fx.Tag.Blocking(CancelMethod = "Dispatch")] 
        public bool WaitForItem(TimeSpan timeout)
        { 
            WaitQueueWaiter waiter = null; 
            bool itemAvailable = false;
 
            lock (ThisLock)
            {
                if (queueState == QueueState.Open)
                { 
                    if (itemQueue.HasAvailableItem)
                    { 
                        itemAvailable = true; 
                    }
                    else 
                    {
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter);
                    } 
                }
                else if (queueState == QueueState.Shutdown) 
                { 
                    if (itemQueue.HasAvailableItem)
                    { 
                        itemAvailable = true;
                    }
                    else if (itemQueue.HasAnyItem)
                    { 
                        waiter = new WaitQueueWaiter();
                        waiterList.Add(waiter); 
                    } 
                    else
                    { 
                        return true;
                    }
                }
                else // queueState == QueueState.Closed 
                {
                    return true; 
                } 
            }
 
            if (waiter != null)
            {
                return waiter.Wait(timeout);
            } 
            else
            { 
                return itemAvailable; 
            }
        } 

        public void Dispose()
        {
            bool dispose = false; 

            lock (ThisLock) 
            { 
                if (queueState != QueueState.Closed)
                { 
                    queueState = QueueState.Closed;
                    dispose = true;
                }
            } 

            if (dispose) 
            { 
                while (readerQueue.Count > 0)
                { 
                    IQueueReader reader = readerQueue.Dequeue();
                    reader.Set(default(Item));
                }
 
                while (itemQueue.HasAnyItem)
                { 
                    Item item = itemQueue.DequeueAnyItem(); 
                    DisposeItem(item);
                    InvokeDequeuedCallback(item.DequeuedCallback); 
                }
            }
        }
 
        void DisposeItem(Item item)
        { 
            T value = item.Value; 
            if (value != null)
            { 
                if (value is IDisposable)
                {
                    ((IDisposable)value).Dispose();
                } 
                else
                { 
                    Action disposeItemCallback = this.DisposeItemCallback; 
                    if (disposeItemCallback != null)
                    { 
                        disposeItemCallback(value);
                    }
                }
            } 
        }
 
        static void CompleteOutstandingReadersCallback(object state) 
        {
            IQueueReader[] outstandingReaders = (IQueueReader[])state; 

            for (int i = 0; i < outstandingReaders.Length; i++)
            {
                outstandingReaders[i].Set(default(Item)); 
            }
        } 
 
        static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
        { 
            for (int i = 0; i < waiters.Length; i++)
            {
                waiters[i].Set(itemAvailable);
            } 
        }
 
        static void CompleteWaitersFalseCallback(object state) 
        {
            CompleteWaiters(false, (IQueueWaiter[])state); 
        }

        static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
        { 
            if (itemAvailable)
            { 
                if (completeWaitersTrueCallback == null) 
                {
                    completeWaitersTrueCallback = new Action(CompleteWaitersTrueCallback); 
                }

                ActionItem.Schedule(completeWaitersTrueCallback, waiters);
            } 
            else
            { 
                if (completeWaitersFalseCallback == null) 
                {
                    completeWaitersFalseCallback = new Action(CompleteWaitersFalseCallback); 
                }

                ActionItem.Schedule(completeWaitersFalseCallback, waiters);
            } 
        }
 
        static void CompleteWaitersTrueCallback(object state) 
        {
            CompleteWaiters(true, (IQueueWaiter[])state); 
        }

        static void InvokeDequeuedCallback(Action dequeuedCallback)
        { 
            if (dequeuedCallback != null)
            { 
                dequeuedCallback(); 
            }
        } 

        static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
        {
            if (dequeuedCallback != null) 
            {
                if (onInvokeDequeuedCallback == null) 
                { 
                    onInvokeDequeuedCallback = new Action(OnInvokeDequeuedCallback);
                } 

                ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
            }
        } 

        static void OnDispatchCallback(object state) 
        { 
            ((InputQueue)state).Dispatch();
        } 

        static void OnInvokeDequeuedCallback(object state)
        {
            Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)"); 

            Action dequeuedCallback = (Action)state; 
            dequeuedCallback(); 
        }
 
        void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
        {
            bool disposeItem = false;
            IQueueReader reader = null; 
            bool dispatchLater = false;
            IQueueWaiter[] waiters = null; 
            bool itemAvailable = true; 

            lock (ThisLock) 
            {
                itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
                this.GetWaiters(out waiters);
 
                if (queueState == QueueState.Open)
                { 
                    if (canDispatchOnThisThread) 
                    {
                        if (readerQueue.Count == 0) 
                        {
                            itemQueue.EnqueueAvailableItem(item);
                        }
                        else 
                        {
                            reader = readerQueue.Dequeue(); 
                        } 
                    }
                    else 
                    {
                        if (readerQueue.Count == 0)
                        {
                            itemQueue.EnqueueAvailableItem(item); 
                        }
                        else 
                        { 
                            itemQueue.EnqueuePendingItem(item);
                            dispatchLater = true; 
                        }
                    }
                }
                else // queueState == QueueState.Closed || queueState == QueueState.Shutdown 
                {
                    disposeItem = true; 
                } 
            }
 
            if (waiters != null)
            {
                if (canDispatchOnThisThread)
                { 
                    CompleteWaiters(itemAvailable, waiters);
                } 
                else 
                {
                    CompleteWaitersLater(itemAvailable, waiters); 
                }
            }

            if (reader != null) 
            {
                InvokeDequeuedCallback(item.DequeuedCallback); 
                reader.Set(item); 
            }
 
            if (dispatchLater)
            {
                if (onDispatchCallback == null)
                { 
                    onDispatchCallback = new Action(OnDispatchCallback);
                } 
 
                ActionItem.Schedule(onDispatchCallback, this);
            } 
            else if (disposeItem)
            {
                InvokeDequeuedCallback(item.DequeuedCallback);
                DisposeItem(item); 
            }
        } 
 
        // This will not block, however, Dispatch() must be called later if this function
        // returns true. 
        bool EnqueueWithoutDispatch(Item item)
        {
            lock (ThisLock)
            { 
                // Open
                if (queueState != QueueState.Closed && queueState != QueueState.Shutdown) 
                { 
                    if (readerQueue.Count == 0 && waiterList.Count == 0)
                    { 
                        itemQueue.EnqueueAvailableItem(item);
                        return false;
                    }
                    else 
                    {
                        itemQueue.EnqueuePendingItem(item); 
                        return true; 
                    }
                } 
            }

            DisposeItem(item);
            InvokeDequeuedCallbackLater(item.DequeuedCallback); 
            return false;
        } 
 
        void GetWaiters(out IQueueWaiter[] waiters)
        { 
            if (waiterList.Count > 0)
            {
                waiters = waiterList.ToArray();
                waiterList.Clear(); 
            }
            else 
            { 
                waiters = null;
            } 
        }

        // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
        // dispatching items to timed out readers. 
        bool RemoveReader(IQueueReader reader)
        { 
            Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)"); 

            lock (ThisLock) 
            {
                if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
                {
                    bool removed = false; 

                    for (int i = readerQueue.Count; i > 0; i--) 
                    { 
                        IQueueReader temp = readerQueue.Dequeue();
                        if (object.ReferenceEquals(temp, reader)) 
                        {
                            removed = true;
                        }
                        else 
                        {
                            readerQueue.Enqueue(temp); 
                        } 
                    }
 
                    return removed;
                }
            }
 
            return false;
        } 
 
        enum QueueState
        { 
            Open,
            Shutdown,
            Closed
        } 

        interface IQueueReader 
        { 
            void Set(Item item);
        } 

        interface IQueueWaiter
        {
            void Set(bool itemAvailable); 
        }
 
        struct Item 
        {
            Action dequeuedCallback; 
            Exception exception;
            T value;

            public Item(T value, Action dequeuedCallback) 
                : this(value, null, dequeuedCallback)
            { 
            } 

            public Item(Exception exception, Action dequeuedCallback) 
                : this(null, exception, dequeuedCallback)
            {
            }
 
            Item(T value, Exception exception, Action dequeuedCallback)
            { 
                this.value = value; 
                this.exception = exception;
                this.dequeuedCallback = dequeuedCallback; 
            }

            public Action DequeuedCallback
            { 
                get { return this.dequeuedCallback; }
            } 
 
            public Exception Exception
            { 
                get { return this.exception; }
            }

            public T Value 
            {
                get { return this.value; } 
            } 

            public T GetValue() 
            {
                if (this.exception != null)
                {
                    throw Fx.Exception.AsError(this.exception); 
                }
 
                return this.value; 
            }
        } 

        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueReader : AsyncResult, IQueueReader
        { 
            static Action timerCallback = new Action(AsyncQueueReader.TimerCallback);
 
            bool expired; 
            InputQueue inputQueue;
            T item; 
            IOThreadTimer timer;

            public AsyncQueueReader(InputQueue inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
                : base(callback, state) 
            {
                if (inputQueue.AsyncCallbackGenerator != null) 
                { 
                    base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
                } 
                this.inputQueue = inputQueue;
                if (timeout != TimeSpan.MaxValue)
                {
                    this.timer = new IOThreadTimer(timerCallback, this, false); 
                    this.timer.Set(timeout);
                } 
            } 

            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")] 
            public static bool End(IAsyncResult result, out T value)
            {
                AsyncQueueReader readerResult = AsyncResult.End(result);
 
                if (readerResult.expired)
                { 
                    value = default(T); 
                    return false;
                } 
                else
                {
                    value = readerResult.item;
                    return true; 
                }
            } 
 
            public void Set(Item item)
            { 
                this.item = item.Value;
                if (this.timer != null)
                {
                    this.timer.Cancel(); 
                }
                Complete(false, item.Exception); 
            } 

            static void TimerCallback(object state) 
            {
                AsyncQueueReader thisPtr = (AsyncQueueReader)state;
                if (thisPtr.inputQueue.RemoveReader(thisPtr))
                { 
                    thisPtr.expired = true;
                    thisPtr.Complete(false); 
                } 
            }
        } 

        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
        class AsyncQueueWaiter : AsyncResult, IQueueWaiter
        { 
            static Action timerCallback = new Action(AsyncQueueWaiter.TimerCallback);
            bool itemAvailable; 
 
            [Fx.Tag.SynchronizationObject(Blocking = false)]
            object thisLock = new object(); 

            IOThreadTimer timer;

            public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) 
            {
                if (timeout != TimeSpan.MaxValue) 
                { 
                    this.timer = new IOThreadTimer(timerCallback, this, false);
                    this.timer.Set(timeout); 
                }
            }

            object ThisLock 
            {
                get 
                { 
                    return this.thisLock;
                } 
            }

            [Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
            public static bool End(IAsyncResult result) 
            {
                AsyncQueueWaiter waiterResult = AsyncResult.End(result); 
                return waiterResult.itemAvailable; 
            }
 
            public void Set(bool itemAvailable)
            {
                bool timely;
 
                lock (ThisLock)
                { 
                    timely = (this.timer == null) || this.timer.Cancel(); 
                    this.itemAvailable = itemAvailable;
                } 

                if (timely)
                {
                    Complete(false); 
                }
            } 
 
            static void TimerCallback(object state)
            { 
                AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
                thisPtr.Complete(false);
            }
        } 

        class ItemQueue 
        { 
            int head;
            Item[] items; 
            int pendingCount;
            int totalCount;

            public ItemQueue() 
            {
                this.items = new Item[1]; 
            } 

            public bool HasAnyItem 
            {
                get { return this.totalCount > 0; }
            }
 
            public bool HasAvailableItem
            { 
                get { return this.totalCount > this.pendingCount; } 
            }
 
            public int ItemCount
            {
                get { return this.totalCount; }
            } 

            public Item DequeueAnyItem() 
            { 
                if (this.pendingCount == this.totalCount)
                { 
                    this.pendingCount--;
                }
                return DequeueItemCore();
            } 

            public Item DequeueAvailableItem() 
            { 
                Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
                return DequeueItemCore(); 
            }

            public void EnqueueAvailableItem(Item item)
            { 
                EnqueueItemCore(item);
            } 
 
            public void EnqueuePendingItem(Item item)
            { 
                EnqueueItemCore(item);
                this.pendingCount++;
            }
 
            public void MakePendingItemAvailable()
            { 
                Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items"); 
                this.pendingCount--;
            } 

            Item DequeueItemCore()
            {
                Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items"); 
                Item item = this.items[this.head];
                this.items[this.head] = new Item(); 
                this.totalCount--; 
                this.head = (this.head + 1) % this.items.Length;
                return item; 
            }

            void EnqueueItemCore(Item item)
            { 
                if (this.totalCount == this.items.Length)
                { 
                    Item[] newItems = new Item[this.items.Length * 2]; 
                    for (int i = 0; i < this.totalCount; i++)
                    { 
                        newItems[i] = this.items[(head + i) % this.items.Length];
                    }
                    this.head = 0;
                    this.items = newItems; 
                }
                int tail = (this.head + this.totalCount) % this.items.Length; 
                this.items[tail] = item; 
                this.totalCount++;
            } 
        }

        [Fx.Tag.SynchronizationObject(Blocking = false)]
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")] 
        class WaitQueueReader : IQueueReader
        { 
            Exception exception; 
            InputQueue inputQueue;
            T item; 

            [Fx.Tag.SynchronizationObject]
            ManualResetEvent waitEvent;
 
            public WaitQueueReader(InputQueue inputQueue)
            { 
                this.inputQueue = inputQueue; 
                waitEvent = new ManualResetEvent(false);
            } 

            public void Set(Item item)
            {
                lock (this) 
                {
                    Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)"); 
                    Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)"); 

                    this.exception = item.Exception; 
                    this.item = item.Value;
                    waitEvent.Set();
                }
            } 

            [Fx.Tag.Blocking(CancelMethod = "Set")] 
            public bool Wait(TimeSpan timeout, out T value) 
            {
                bool isSafeToClose = false; 
                try
                {
                    if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                    { 
                        if (this.inputQueue.RemoveReader(this))
                        { 
                            value = default(T); 
                            isSafeToClose = true;
                            return false; 
                        }
                        else
                        {
                            waitEvent.WaitOne(); 
                        }
                    } 
 
                    isSafeToClose = true;
                } 
                finally
                {
                    if (isSafeToClose)
                    { 
                        waitEvent.Close();
                    } 
                } 

                if (this.exception != null) 
                {
                    throw Fx.Exception.AsError(this.exception);
                }
 
                value = item;
                return true; 
            } 
        }
 
        [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
        class WaitQueueWaiter : IQueueWaiter
        {
            bool itemAvailable; 

            [Fx.Tag.SynchronizationObject] 
            ManualResetEvent waitEvent; 

            public WaitQueueWaiter() 
            {
                waitEvent = new ManualResetEvent(false);
            }
 
            public void Set(bool itemAvailable)
            { 
                lock (this) 
                {
                    this.itemAvailable = itemAvailable; 
                    waitEvent.Set();
                }
            }
 
            [Fx.Tag.Blocking(CancelMethod = "Set")]
            public bool Wait(TimeSpan timeout) 
            { 
                if (!TimeoutHelper.WaitOne(waitEvent, timeout))
                { 
                    return false;
                }

                return this.itemAvailable; 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK