Code:
/ Net / Net / 3.5.50727.3053 / DEVDIV / depot / DevDiv / releases / whidbey / netfxsp / ndp / fx / src / Net / System / Net / connectionpool.cs / 1 / connectionpool.cs
//------------------------------------------------------------------------------ //// Copyright (c) Microsoft Corporation. All rights reserved. // //----------------------------------------------------------------------------- namespace System.Net { using System; using System.Net.Sockets; using System.Collections; using System.Diagnostics; using System.Globalization; using System.Runtime.InteropServices; using System.Security; using System.Security.Permissions; using System.Threading; internal delegate void GeneralAsyncDelegate(object request, object state); internal delegate PooledStream CreateConnectionDelegate(ConnectionPool pool); ////// internal class ConnectionPool { private enum State { Initializing, Running, ShuttingDown, } private static TimerThread.Callback s_CleanupCallback = new TimerThread.Callback(CleanupCallbackWrapper); private static TimerThread.Callback s_CancelErrorCallback = new TimerThread.Callback(CancelErrorCallbackWrapper); private static TimerThread.Queue s_CancelErrorQueue = TimerThread.GetOrCreateQueue(ErrorWait); private const int MaxQueueSize = (int)0x00100000; // The order of these is important; we want the WaitAny call to be signaled // for a free object before a creation signal. Only the index first signaled // object is returned from the WaitAny call. private const int SemaphoreHandleIndex = (int)0x0; private const int ErrorHandleIndex = (int)0x1; private const int CreationHandleIndex = (int)0x2; private const int WaitTimeout = (int)0x102; private const int WaitAbandoned = (int)0x80; private const int ErrorWait = 5 * 1000; // 5 seconds private readonly TimerThread.Queue m_CleanupQueue; private State m_State; private InterlockedStack m_StackOld; private InterlockedStack m_StackNew; private int m_WaitCount; private WaitHandle[] m_WaitHandles; private Exception m_ResError; private volatile bool m_ErrorOccured; private TimerThread.Timer m_ErrorTimer; private ArrayList m_ObjectList; private int m_TotalObjects; private Queue m_QueuedRequests; private Thread m_AsyncThread; private int m_MaxPoolSize; private int m_MinPoolSize; private ServicePoint m_ServicePoint; private CreateConnectionDelegate m_CreateConnectionCallback; private Mutex CreationMutex { get { return (Mutex) m_WaitHandles[CreationHandleIndex]; } } private ManualResetEvent ErrorEvent { get { return (ManualResetEvent) m_WaitHandles[ErrorHandleIndex]; } } private Semaphore Semaphore { get { return (Semaphore) m_WaitHandles[SemaphoreHandleIndex]; } } ////// Impliments basic ConnectionPooling by pooling PooledStreams /// ////// internal ConnectionPool(ServicePoint servicePoint, int maxPoolSize, int minPoolSize, int idleTimeout, CreateConnectionDelegate createConnectionCallback) : base() { m_State = State.Initializing; m_CreateConnectionCallback = createConnectionCallback; m_MaxPoolSize = maxPoolSize; m_MinPoolSize = minPoolSize; m_ServicePoint = servicePoint; Initialize(); if (idleTimeout > 0) { m_CleanupQueue = TimerThread.GetOrCreateQueue(idleTimeout / 2); m_CleanupQueue.CreateTimer(s_CleanupCallback, this); } } ///Constructor - binds pool with a servicePoint and sets up a cleanup Timer to ---- Idle Connections ////// private void Initialize() { m_StackOld = new InterlockedStack(); m_StackNew = new InterlockedStack(); m_QueuedRequests = new Queue(); m_WaitHandles = new WaitHandle[3]; m_WaitHandles[SemaphoreHandleIndex] = new Semaphore(0, MaxQueueSize); m_WaitHandles[ErrorHandleIndex] = new ManualResetEvent(false); m_WaitHandles[CreationHandleIndex] = new Mutex(); m_ErrorTimer = null; // No error yet. m_ObjectList = new ArrayList(); m_State = State.Running; } ///Internal init stuff, creates stacks, queue, wait handles etc ////// private class AsyncConnectionPoolRequest { public AsyncConnectionPoolRequest(ConnectionPool pool, object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) { Pool = pool; OwningObject = owningObject; AsyncCallback = asyncCallback; CreationTimeout = creationTimeout; } public object OwningObject; public GeneralAsyncDelegate AsyncCallback; public bool Completed; public ConnectionPool Pool; public int CreationTimeout; } ///Async state object, used for storing state on async calls ////// private void QueueRequest(AsyncConnectionPoolRequest asyncRequest) { lock(m_QueuedRequests) { m_QueuedRequests.Enqueue(asyncRequest); if (m_AsyncThread == null) { m_AsyncThread = new Thread(new ThreadStart(AsyncThread)); m_AsyncThread.IsBackground = true; m_AsyncThread.Start(); } } } ///Queues a AsyncConnectionPoolRequest to our queue of requests needing /// a pooled stream. If an AsyncThread is not created, we create one, /// and let it process the queued items ////// private void AsyncThread() { do { while (m_QueuedRequests.Count > 0) { bool continueLoop = true; AsyncConnectionPoolRequest asyncState = null; lock (m_QueuedRequests) { asyncState = (AsyncConnectionPoolRequest) m_QueuedRequests.Dequeue(); } WaitHandle [] localWaitHandles = m_WaitHandles; PooledStream PooledStream = null; try { while ((PooledStream == null) && continueLoop) { int result = WaitHandle.WaitAny(localWaitHandles, asyncState.CreationTimeout, false); PooledStream = Get(asyncState.OwningObject, result, ref continueLoop, ref localWaitHandles); } PooledStream.Activate(asyncState.OwningObject, asyncState.AsyncCallback); } catch (Exception e) { if(PooledStream != null){ PooledStream.Close(); PutConnection(PooledStream,asyncState.OwningObject,asyncState.CreationTimeout); } asyncState.AsyncCallback(asyncState.OwningObject, e); } catch { if(PooledStream != null){ PooledStream.Close(); PutConnection(PooledStream,asyncState.OwningObject,asyncState.CreationTimeout); } asyncState.AsyncCallback(asyncState.OwningObject, new Exception(SR.GetString(SR.net_nonClsCompliantException))); } } Thread.Sleep(500); lock(m_QueuedRequests) { if (m_QueuedRequests.Count == 0) { m_AsyncThread = null; break; } } } while (true); } ///Processes async queued requests that are blocked on needing a free pooled stream /// works as follows: /// 1. while there are blocked requests, take one out of the queue /// 2. Wait for a free connection, when one becomes avail, then notify the request that its there /// 3. repeat 1 until there are no more queued requests /// 4. if there are no more requests waiting to for a free stream, then close down this thread /// ////// internal int Count { get { return(m_TotalObjects); } } ///Count of total pooled streams associated with this pool, including streams that are being used ////// internal ServicePoint ServicePoint { get { return m_ServicePoint; } } ///Our ServicePoint, used for IP resolution ////// internal int MaxPoolSize { get { return m_MaxPoolSize; } } ///Our Max Size of outstanding pooled streams ////// internal int MinPoolSize { get { return m_MinPoolSize; } } ///Our Min Size of the pool to remove idled items down to ////// private bool ErrorOccurred { get { return m_ErrorOccured; } } private static void CleanupCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context) { ConnectionPool pThis = (ConnectionPool) context; try { pThis.CleanupCallback(); } finally { pThis.m_CleanupQueue.CreateTimer(s_CleanupCallback, context); } } ///An Error occurred usually due to an abort ////// private void CleanupCallback() { // Called when the cleanup-timer ticks over. // // This is the automatic prunning method. Every period, we will perform a two-step // process. First, for the objects above MinPool, we will obtain the semaphore for // the object and then destroy it if it was on the old stack. We will continue this // until we either reach MinPool size, or we are unable to obtain a free object, or // until we have exhausted all the objects on the old stack. After that, push all // objects on the new stack to the old stack. So, every period the objects on the // old stack are destroyed and the objects on the new stack are pushed to the old // stack. All objects that are currently out and in use are not on either stack. // With this logic, a object is prunned if unused for at least one period but not // more than two periods. // Destroy free objects above MinPool size from old stack. while(Count > MinPoolSize) { // While above MinPoolSize... if (Semaphore.WaitOne(0, false) ) { // != WaitTimeout // We obtained a objects from the semaphore. PooledStream pooledStream = (PooledStream) m_StackOld.Pop(); if (null != pooledStream) { // If we obtained one from the old stack, destroy it. Destroy(pooledStream); } else { // Else we exhausted the old stack, so break. Semaphore.ReleaseSemaphore(); break; } } else break; } // Push to the old-stack. For each free object, move object from new stack // to old stack. if(Semaphore.WaitOne(0, false)) { // != WaitTimeout for(;;) { PooledStream pooledStream = (PooledStream) m_StackNew.Pop(); if (null == pooledStream) break; GlobalLog.Assert(!pooledStream.IsEmancipated, "Pooled object not in pool."); GlobalLog.Assert(pooledStream.CanBePooled, "Pooled object is not poolable."); m_StackOld.Push(pooledStream); } Semaphore.ReleaseSemaphore(); } } ///This is called by a timer, to check for needed cleanup of idle pooled streams ////// private PooledStream Create(CreateConnectionDelegate createConnectionCallback) { GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create"); PooledStream newObj = null; try { newObj = createConnectionCallback(this); if (null == newObj) throw new InternalException(); // Create succeeded, but null object if (!newObj.CanBePooled) throw new InternalException(); // Create succeeded, but non-poolable object newObj.PrePush(null); lock (m_ObjectList.SyncRoot) { m_ObjectList.Add(newObj); m_TotalObjects = m_ObjectList.Count; } GlobalLog.Print("Create pooledStream#"+ValidationHelper.HashString(newObj)); } catch(Exception e) { GlobalLog.Print("Pool Exception: " + e.Message); newObj = null; // set to null, so we do not return bad new object // Failed to create instance m_ResError = e; Abort(); } catch { GlobalLog.Print("Pool Exception: Non-CLS Compliant Exception"); newObj = null; // set to null, so we do not return bad new object // Failed to create instance m_ResError = new Exception(SR.GetString(SR.net_nonClsCompliantException)); Abort(); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create",ValidationHelper.HashString(newObj)); return newObj; } ///Creates a new PooledStream, performs checks as well on the new stream ////// private void Destroy(PooledStream pooledStream) { GlobalLog.Print("Destroy pooledStream#"+ValidationHelper.HashString(pooledStream)); try { lock (m_ObjectList.SyncRoot) { m_ObjectList.Remove(pooledStream); m_TotalObjects = m_ObjectList.Count; } } finally { if (null != pooledStream) { pooledStream.Destroy(); } } } private static void CancelErrorCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context) { ((ConnectionPool) context).CancelErrorCallback(); } ///Destroys a pooled stream from the pool ////// private void CancelErrorCallback() { TimerThread.Timer timer = m_ErrorTimer; if (timer != null && timer.Cancel()) { m_ErrorOccured = false; ErrorEvent.Reset(); m_ErrorTimer = null; m_ResError = null; } } ///Called on error, after we waited a set amount of time from aborting ////// private PooledStream GetFromPool(object owningObject) { PooledStream res = null; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool"); res = (PooledStream) m_StackNew.Pop(); if (null == res) { res = (PooledStream) m_StackOld.Pop(); } // Shouldn't be null, we could assert here. GlobalLog.Assert(res != null, "GetFromPool called with nothing in the pool!"); if (null != res) { res.PostPop(owningObject); GlobalLog.Print("GetFromGeneralPool pooledStream#"+ValidationHelper.HashString(res)); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool",ValidationHelper.HashString(res)); return(res); } ///Retrieves a pooled stream from the pool proper /// this work by first attemting to find something in the pool on the New stack /// and then trying the Old stack if something is not there availble ////// private PooledStream Get(object owningObject, int result, ref bool continueLoop, ref WaitHandle [] waitHandles) { PooledStream pooledStream = null; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", result.ToString()); // From the WaitAny docs: "If more than one object became signaled during // the call, this is the array index of the signaled object with the // smallest index value of all the signaled objects." This is important // so that the free object signal will be returned before a creation // signal. switch (result) { case WaitTimeout: Interlocked.Decrement(ref m_WaitCount); continueLoop = false; GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get","throw Timeout WebException"); throw new WebException(NetRes.GetWebStatusString("net_timeout", WebExceptionStatus.ConnectFailure), WebExceptionStatus.Timeout); case ErrorHandleIndex: // Throw the error that PoolCreateRequest stashed. int newWaitCount = Interlocked.Decrement(ref m_WaitCount); continueLoop = false; Exception exceptionToThrow = m_ResError; if (newWaitCount == 0) { CancelErrorCallback(); } throw exceptionToThrow; case CreationHandleIndex: try { continueLoop = true; pooledStream = UserCreateRequest(); if (null != pooledStream) { pooledStream.PostPop(owningObject); Interlocked.Decrement(ref m_WaitCount); continueLoop = false; } else { // If we were not able to create an object, check to see if // we reached MaxPoolSize. If so, we will no longer wait on // the CreationHandle, but instead wait for a free object or // the timeout. // if (Count >= MaxPoolSize && 0 != MaxPoolSize) { if (!ReclaimEmancipatedObjects()) { // modify handle array not to wait on creation mutex anymore waitHandles = new WaitHandle[2]; waitHandles[0] = m_WaitHandles[0]; waitHandles[1] = m_WaitHandles[1]; } } } } finally { CreationMutex.ReleaseMutex(); } break; default: // // guaranteed available inventory // Interlocked.Decrement(ref m_WaitCount); pooledStream = GetFromPool(owningObject); continueLoop = false; break; } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get",ValidationHelper.HashString(pooledStream)); return pooledStream; } ///Retrieves the pooled stream out of the pool, does this by using the result /// of a WaitAny as input, and then based on whether it has a mutex, event, semaphore, /// or timeout decides what action to take ////// internal void Abort() { if (m_ResError == null) { m_ResError = new WebException( NetRes.GetWebStatusString("net_requestaborted", WebExceptionStatus.RequestCanceled), WebExceptionStatus.RequestCanceled); } ErrorEvent.Set(); m_ErrorOccured = true; m_ErrorTimer = s_CancelErrorQueue.CreateTimer(s_CancelErrorCallback, this); } ///Aborts the queued requests to the pool ////// internal PooledStream GetConnection(object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) { int result; PooledStream stream = null; bool continueLoop = true; bool async = (asyncCallback != null) ? true : false; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection"); if(m_State != State.Running) { throw new InternalException(); } Interlocked.Increment(ref m_WaitCount); WaitHandle[] localWaitHandles = m_WaitHandles; if (async) { result = WaitHandle.WaitAny(localWaitHandles, 0, false); if (result != WaitTimeout) { stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles); } if (stream == null) { GlobalLog.Print("GetConnection:"+ValidationHelper.HashString(this)+" going async"); AsyncConnectionPoolRequest asyncState = new AsyncConnectionPoolRequest(this, owningObject, asyncCallback, creationTimeout); QueueRequest(asyncState); } } else { // loop while we don't have an error/timeout and we haven't gotten a stream yet while ((stream == null) && continueLoop) { result = WaitHandle.WaitAny(localWaitHandles, creationTimeout, false); stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles); } } if (null != stream) { // if there is already a stream, then we're not going async if (!stream.IsInitalizing) { asyncCallback = null; } try{ // If activate returns false, it is going to finish asynchronously // and therefore the stream will be returned in a callback and // we should not return it here (return null) if (stream.Activate(owningObject, asyncCallback) == false) stream = null; } catch{ stream.Close(); PutConnection(stream,owningObject,creationTimeout); throw; } } else if (!async) { throw new InternalException(); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection", ValidationHelper.HashString(stream)); return(stream); } ///Attempts to create a PooledStream, by trying to get a pooled Connection, /// or by creating its own new one ////// internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout) { GlobalLog.Print("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutConnection"); if (pooledStream == null) { throw new ArgumentNullException("pooledStream"); } pooledStream.PrePush(owningObject); if (m_State != State.ShuttingDown) { pooledStream.Deactivate(); // cancel our error status, if we have no new requests waiting anymore if (m_WaitCount == 0) { CancelErrorCallback(); } if (pooledStream.CanBePooled) { PutNew(pooledStream); } else { Destroy(pooledStream); // Make sure we recreate a new pooled stream, if there are requests for a stream // at this point if (m_WaitCount > 0) { if (!CreationMutex.WaitOne(creationTimeout, false)) { Abort(); } else { try { pooledStream = UserCreateRequest(); if (null != pooledStream) { PutNew(pooledStream); } } finally { CreationMutex.ReleaseMutex(); } } } } } else { // If we're shutting down, we destroy the object. Destroy(pooledStream); } } ///Attempts to return a PooledStream to the pool ////// private void PutNew(PooledStream pooledStream) { GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew", "#"+ValidationHelper.HashString(pooledStream)); GlobalLog.Assert(null != pooledStream, "Why are we adding a null object to the pool?"); GlobalLog.Assert(pooledStream.CanBePooled, "Non-poolable object in pool."); m_StackNew.Push(pooledStream); Semaphore.ReleaseSemaphore(); GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew"); } ///Places a new/reusable stream in the new stack of the pool ////// private bool ReclaimEmancipatedObjects() { bool emancipatedObjectFound = false; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects"); lock(m_ObjectList.SyncRoot) { object[] objectList = m_ObjectList.ToArray(); if (null != objectList) { for (int i = 0; i < objectList.Length; ++i) { PooledStream pooledStream = (PooledStream) objectList[i]; if (null != pooledStream) { bool locked = false; try { locked = Monitor.TryEnter(pooledStream); if (locked) { if (pooledStream.IsEmancipated) { GlobalLog.Print("EmancipatedObject pooledStream#"+ValidationHelper.HashString(pooledStream)); PutConnection(pooledStream, null, Timeout.Infinite); emancipatedObjectFound = true; } } } finally { if (locked) Monitor.Exit(pooledStream); } } } } } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects",emancipatedObjectFound.ToString()); return emancipatedObjectFound; } ///Reclaim any pooled Streams that have seen their users/WebRequests GCed away ////// private PooledStream UserCreateRequest() { // called by user when they were not able to obtain a free object but // instead obtained creation mutex GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest"); PooledStream pooledStream = null; if (!ErrorOccurred) { if (Count < MaxPoolSize || 0 == MaxPoolSize) { // If we have an odd number of total objects, reclaim any dead objects. // If we did not find any objects to reclaim, create a new one. // if ((Count & 0x1) == 0x1 || !ReclaimEmancipatedObjects()) pooledStream = Create(m_CreateConnectionCallback); } } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest", ValidationHelper.HashString(pooledStream)); return pooledStream; } } ///Creates a new PooledStream is allowable ////// sealed internal class InterlockedStack { private readonly Stack _stack = new Stack(); private int _count; #if DEBUG private readonly Hashtable doublepush = new Hashtable(); #endif internal InterlockedStack() { } internal void Push(Object pooledStream) { GlobalLog.Assert(null != pooledStream, "push null"); if (null == pooledStream) { throw new ArgumentNullException("pooledStream"); } lock(_stack.SyncRoot) { #if DEBUG GlobalLog.Assert(null == doublepush[pooledStream], "object already in stack"); doublepush[pooledStream] = _stack.Count; #endif _stack.Push(pooledStream); #if DEBUG GlobalLog.Assert(_count+1 == _stack.Count, "push count mishandle"); #endif _count = _stack.Count; } } internal Object Pop() { lock(_stack.SyncRoot) { object pooledStream = null; if (0 <_stack.Count) { pooledStream = _stack.Pop(); #if DEBUG GlobalLog.Assert(_count-1 == _stack.Count, "pop count mishandle"); doublepush.Remove(pooledStream); #endif _count = _stack.Count; } return pooledStream; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. //------------------------------------------------------------------------------ //Used to Pool streams in a thread safe manner ///// Copyright (c) Microsoft Corporation. All rights reserved. // //----------------------------------------------------------------------------- namespace System.Net { using System; using System.Net.Sockets; using System.Collections; using System.Diagnostics; using System.Globalization; using System.Runtime.InteropServices; using System.Security; using System.Security.Permissions; using System.Threading; internal delegate void GeneralAsyncDelegate(object request, object state); internal delegate PooledStream CreateConnectionDelegate(ConnectionPool pool); ////// internal class ConnectionPool { private enum State { Initializing, Running, ShuttingDown, } private static TimerThread.Callback s_CleanupCallback = new TimerThread.Callback(CleanupCallbackWrapper); private static TimerThread.Callback s_CancelErrorCallback = new TimerThread.Callback(CancelErrorCallbackWrapper); private static TimerThread.Queue s_CancelErrorQueue = TimerThread.GetOrCreateQueue(ErrorWait); private const int MaxQueueSize = (int)0x00100000; // The order of these is important; we want the WaitAny call to be signaled // for a free object before a creation signal. Only the index first signaled // object is returned from the WaitAny call. private const int SemaphoreHandleIndex = (int)0x0; private const int ErrorHandleIndex = (int)0x1; private const int CreationHandleIndex = (int)0x2; private const int WaitTimeout = (int)0x102; private const int WaitAbandoned = (int)0x80; private const int ErrorWait = 5 * 1000; // 5 seconds private readonly TimerThread.Queue m_CleanupQueue; private State m_State; private InterlockedStack m_StackOld; private InterlockedStack m_StackNew; private int m_WaitCount; private WaitHandle[] m_WaitHandles; private Exception m_ResError; private volatile bool m_ErrorOccured; private TimerThread.Timer m_ErrorTimer; private ArrayList m_ObjectList; private int m_TotalObjects; private Queue m_QueuedRequests; private Thread m_AsyncThread; private int m_MaxPoolSize; private int m_MinPoolSize; private ServicePoint m_ServicePoint; private CreateConnectionDelegate m_CreateConnectionCallback; private Mutex CreationMutex { get { return (Mutex) m_WaitHandles[CreationHandleIndex]; } } private ManualResetEvent ErrorEvent { get { return (ManualResetEvent) m_WaitHandles[ErrorHandleIndex]; } } private Semaphore Semaphore { get { return (Semaphore) m_WaitHandles[SemaphoreHandleIndex]; } } ////// Impliments basic ConnectionPooling by pooling PooledStreams /// ////// internal ConnectionPool(ServicePoint servicePoint, int maxPoolSize, int minPoolSize, int idleTimeout, CreateConnectionDelegate createConnectionCallback) : base() { m_State = State.Initializing; m_CreateConnectionCallback = createConnectionCallback; m_MaxPoolSize = maxPoolSize; m_MinPoolSize = minPoolSize; m_ServicePoint = servicePoint; Initialize(); if (idleTimeout > 0) { m_CleanupQueue = TimerThread.GetOrCreateQueue(idleTimeout / 2); m_CleanupQueue.CreateTimer(s_CleanupCallback, this); } } ///Constructor - binds pool with a servicePoint and sets up a cleanup Timer to ---- Idle Connections ////// private void Initialize() { m_StackOld = new InterlockedStack(); m_StackNew = new InterlockedStack(); m_QueuedRequests = new Queue(); m_WaitHandles = new WaitHandle[3]; m_WaitHandles[SemaphoreHandleIndex] = new Semaphore(0, MaxQueueSize); m_WaitHandles[ErrorHandleIndex] = new ManualResetEvent(false); m_WaitHandles[CreationHandleIndex] = new Mutex(); m_ErrorTimer = null; // No error yet. m_ObjectList = new ArrayList(); m_State = State.Running; } ///Internal init stuff, creates stacks, queue, wait handles etc ////// private class AsyncConnectionPoolRequest { public AsyncConnectionPoolRequest(ConnectionPool pool, object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) { Pool = pool; OwningObject = owningObject; AsyncCallback = asyncCallback; CreationTimeout = creationTimeout; } public object OwningObject; public GeneralAsyncDelegate AsyncCallback; public bool Completed; public ConnectionPool Pool; public int CreationTimeout; } ///Async state object, used for storing state on async calls ////// private void QueueRequest(AsyncConnectionPoolRequest asyncRequest) { lock(m_QueuedRequests) { m_QueuedRequests.Enqueue(asyncRequest); if (m_AsyncThread == null) { m_AsyncThread = new Thread(new ThreadStart(AsyncThread)); m_AsyncThread.IsBackground = true; m_AsyncThread.Start(); } } } ///Queues a AsyncConnectionPoolRequest to our queue of requests needing /// a pooled stream. If an AsyncThread is not created, we create one, /// and let it process the queued items ////// private void AsyncThread() { do { while (m_QueuedRequests.Count > 0) { bool continueLoop = true; AsyncConnectionPoolRequest asyncState = null; lock (m_QueuedRequests) { asyncState = (AsyncConnectionPoolRequest) m_QueuedRequests.Dequeue(); } WaitHandle [] localWaitHandles = m_WaitHandles; PooledStream PooledStream = null; try { while ((PooledStream == null) && continueLoop) { int result = WaitHandle.WaitAny(localWaitHandles, asyncState.CreationTimeout, false); PooledStream = Get(asyncState.OwningObject, result, ref continueLoop, ref localWaitHandles); } PooledStream.Activate(asyncState.OwningObject, asyncState.AsyncCallback); } catch (Exception e) { if(PooledStream != null){ PooledStream.Close(); PutConnection(PooledStream,asyncState.OwningObject,asyncState.CreationTimeout); } asyncState.AsyncCallback(asyncState.OwningObject, e); } catch { if(PooledStream != null){ PooledStream.Close(); PutConnection(PooledStream,asyncState.OwningObject,asyncState.CreationTimeout); } asyncState.AsyncCallback(asyncState.OwningObject, new Exception(SR.GetString(SR.net_nonClsCompliantException))); } } Thread.Sleep(500); lock(m_QueuedRequests) { if (m_QueuedRequests.Count == 0) { m_AsyncThread = null; break; } } } while (true); } ///Processes async queued requests that are blocked on needing a free pooled stream /// works as follows: /// 1. while there are blocked requests, take one out of the queue /// 2. Wait for a free connection, when one becomes avail, then notify the request that its there /// 3. repeat 1 until there are no more queued requests /// 4. if there are no more requests waiting to for a free stream, then close down this thread /// ////// internal int Count { get { return(m_TotalObjects); } } ///Count of total pooled streams associated with this pool, including streams that are being used ////// internal ServicePoint ServicePoint { get { return m_ServicePoint; } } ///Our ServicePoint, used for IP resolution ////// internal int MaxPoolSize { get { return m_MaxPoolSize; } } ///Our Max Size of outstanding pooled streams ////// internal int MinPoolSize { get { return m_MinPoolSize; } } ///Our Min Size of the pool to remove idled items down to ////// private bool ErrorOccurred { get { return m_ErrorOccured; } } private static void CleanupCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context) { ConnectionPool pThis = (ConnectionPool) context; try { pThis.CleanupCallback(); } finally { pThis.m_CleanupQueue.CreateTimer(s_CleanupCallback, context); } } ///An Error occurred usually due to an abort ////// private void CleanupCallback() { // Called when the cleanup-timer ticks over. // // This is the automatic prunning method. Every period, we will perform a two-step // process. First, for the objects above MinPool, we will obtain the semaphore for // the object and then destroy it if it was on the old stack. We will continue this // until we either reach MinPool size, or we are unable to obtain a free object, or // until we have exhausted all the objects on the old stack. After that, push all // objects on the new stack to the old stack. So, every period the objects on the // old stack are destroyed and the objects on the new stack are pushed to the old // stack. All objects that are currently out and in use are not on either stack. // With this logic, a object is prunned if unused for at least one period but not // more than two periods. // Destroy free objects above MinPool size from old stack. while(Count > MinPoolSize) { // While above MinPoolSize... if (Semaphore.WaitOne(0, false) ) { // != WaitTimeout // We obtained a objects from the semaphore. PooledStream pooledStream = (PooledStream) m_StackOld.Pop(); if (null != pooledStream) { // If we obtained one from the old stack, destroy it. Destroy(pooledStream); } else { // Else we exhausted the old stack, so break. Semaphore.ReleaseSemaphore(); break; } } else break; } // Push to the old-stack. For each free object, move object from new stack // to old stack. if(Semaphore.WaitOne(0, false)) { // != WaitTimeout for(;;) { PooledStream pooledStream = (PooledStream) m_StackNew.Pop(); if (null == pooledStream) break; GlobalLog.Assert(!pooledStream.IsEmancipated, "Pooled object not in pool."); GlobalLog.Assert(pooledStream.CanBePooled, "Pooled object is not poolable."); m_StackOld.Push(pooledStream); } Semaphore.ReleaseSemaphore(); } } ///This is called by a timer, to check for needed cleanup of idle pooled streams ////// private PooledStream Create(CreateConnectionDelegate createConnectionCallback) { GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create"); PooledStream newObj = null; try { newObj = createConnectionCallback(this); if (null == newObj) throw new InternalException(); // Create succeeded, but null object if (!newObj.CanBePooled) throw new InternalException(); // Create succeeded, but non-poolable object newObj.PrePush(null); lock (m_ObjectList.SyncRoot) { m_ObjectList.Add(newObj); m_TotalObjects = m_ObjectList.Count; } GlobalLog.Print("Create pooledStream#"+ValidationHelper.HashString(newObj)); } catch(Exception e) { GlobalLog.Print("Pool Exception: " + e.Message); newObj = null; // set to null, so we do not return bad new object // Failed to create instance m_ResError = e; Abort(); } catch { GlobalLog.Print("Pool Exception: Non-CLS Compliant Exception"); newObj = null; // set to null, so we do not return bad new object // Failed to create instance m_ResError = new Exception(SR.GetString(SR.net_nonClsCompliantException)); Abort(); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create",ValidationHelper.HashString(newObj)); return newObj; } ///Creates a new PooledStream, performs checks as well on the new stream ////// private void Destroy(PooledStream pooledStream) { GlobalLog.Print("Destroy pooledStream#"+ValidationHelper.HashString(pooledStream)); try { lock (m_ObjectList.SyncRoot) { m_ObjectList.Remove(pooledStream); m_TotalObjects = m_ObjectList.Count; } } finally { if (null != pooledStream) { pooledStream.Destroy(); } } } private static void CancelErrorCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context) { ((ConnectionPool) context).CancelErrorCallback(); } ///Destroys a pooled stream from the pool ////// private void CancelErrorCallback() { TimerThread.Timer timer = m_ErrorTimer; if (timer != null && timer.Cancel()) { m_ErrorOccured = false; ErrorEvent.Reset(); m_ErrorTimer = null; m_ResError = null; } } ///Called on error, after we waited a set amount of time from aborting ////// private PooledStream GetFromPool(object owningObject) { PooledStream res = null; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool"); res = (PooledStream) m_StackNew.Pop(); if (null == res) { res = (PooledStream) m_StackOld.Pop(); } // Shouldn't be null, we could assert here. GlobalLog.Assert(res != null, "GetFromPool called with nothing in the pool!"); if (null != res) { res.PostPop(owningObject); GlobalLog.Print("GetFromGeneralPool pooledStream#"+ValidationHelper.HashString(res)); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool",ValidationHelper.HashString(res)); return(res); } ///Retrieves a pooled stream from the pool proper /// this work by first attemting to find something in the pool on the New stack /// and then trying the Old stack if something is not there availble ////// private PooledStream Get(object owningObject, int result, ref bool continueLoop, ref WaitHandle [] waitHandles) { PooledStream pooledStream = null; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", result.ToString()); // From the WaitAny docs: "If more than one object became signaled during // the call, this is the array index of the signaled object with the // smallest index value of all the signaled objects." This is important // so that the free object signal will be returned before a creation // signal. switch (result) { case WaitTimeout: Interlocked.Decrement(ref m_WaitCount); continueLoop = false; GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get","throw Timeout WebException"); throw new WebException(NetRes.GetWebStatusString("net_timeout", WebExceptionStatus.ConnectFailure), WebExceptionStatus.Timeout); case ErrorHandleIndex: // Throw the error that PoolCreateRequest stashed. int newWaitCount = Interlocked.Decrement(ref m_WaitCount); continueLoop = false; Exception exceptionToThrow = m_ResError; if (newWaitCount == 0) { CancelErrorCallback(); } throw exceptionToThrow; case CreationHandleIndex: try { continueLoop = true; pooledStream = UserCreateRequest(); if (null != pooledStream) { pooledStream.PostPop(owningObject); Interlocked.Decrement(ref m_WaitCount); continueLoop = false; } else { // If we were not able to create an object, check to see if // we reached MaxPoolSize. If so, we will no longer wait on // the CreationHandle, but instead wait for a free object or // the timeout. // if (Count >= MaxPoolSize && 0 != MaxPoolSize) { if (!ReclaimEmancipatedObjects()) { // modify handle array not to wait on creation mutex anymore waitHandles = new WaitHandle[2]; waitHandles[0] = m_WaitHandles[0]; waitHandles[1] = m_WaitHandles[1]; } } } } finally { CreationMutex.ReleaseMutex(); } break; default: // // guaranteed available inventory // Interlocked.Decrement(ref m_WaitCount); pooledStream = GetFromPool(owningObject); continueLoop = false; break; } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get",ValidationHelper.HashString(pooledStream)); return pooledStream; } ///Retrieves the pooled stream out of the pool, does this by using the result /// of a WaitAny as input, and then based on whether it has a mutex, event, semaphore, /// or timeout decides what action to take ////// internal void Abort() { if (m_ResError == null) { m_ResError = new WebException( NetRes.GetWebStatusString("net_requestaborted", WebExceptionStatus.RequestCanceled), WebExceptionStatus.RequestCanceled); } ErrorEvent.Set(); m_ErrorOccured = true; m_ErrorTimer = s_CancelErrorQueue.CreateTimer(s_CancelErrorCallback, this); } ///Aborts the queued requests to the pool ////// internal PooledStream GetConnection(object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) { int result; PooledStream stream = null; bool continueLoop = true; bool async = (asyncCallback != null) ? true : false; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection"); if(m_State != State.Running) { throw new InternalException(); } Interlocked.Increment(ref m_WaitCount); WaitHandle[] localWaitHandles = m_WaitHandles; if (async) { result = WaitHandle.WaitAny(localWaitHandles, 0, false); if (result != WaitTimeout) { stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles); } if (stream == null) { GlobalLog.Print("GetConnection:"+ValidationHelper.HashString(this)+" going async"); AsyncConnectionPoolRequest asyncState = new AsyncConnectionPoolRequest(this, owningObject, asyncCallback, creationTimeout); QueueRequest(asyncState); } } else { // loop while we don't have an error/timeout and we haven't gotten a stream yet while ((stream == null) && continueLoop) { result = WaitHandle.WaitAny(localWaitHandles, creationTimeout, false); stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles); } } if (null != stream) { // if there is already a stream, then we're not going async if (!stream.IsInitalizing) { asyncCallback = null; } try{ // If activate returns false, it is going to finish asynchronously // and therefore the stream will be returned in a callback and // we should not return it here (return null) if (stream.Activate(owningObject, asyncCallback) == false) stream = null; } catch{ stream.Close(); PutConnection(stream,owningObject,creationTimeout); throw; } } else if (!async) { throw new InternalException(); } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection", ValidationHelper.HashString(stream)); return(stream); } ///Attempts to create a PooledStream, by trying to get a pooled Connection, /// or by creating its own new one ////// internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout) { GlobalLog.Print("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutConnection"); if (pooledStream == null) { throw new ArgumentNullException("pooledStream"); } pooledStream.PrePush(owningObject); if (m_State != State.ShuttingDown) { pooledStream.Deactivate(); // cancel our error status, if we have no new requests waiting anymore if (m_WaitCount == 0) { CancelErrorCallback(); } if (pooledStream.CanBePooled) { PutNew(pooledStream); } else { Destroy(pooledStream); // Make sure we recreate a new pooled stream, if there are requests for a stream // at this point if (m_WaitCount > 0) { if (!CreationMutex.WaitOne(creationTimeout, false)) { Abort(); } else { try { pooledStream = UserCreateRequest(); if (null != pooledStream) { PutNew(pooledStream); } } finally { CreationMutex.ReleaseMutex(); } } } } } else { // If we're shutting down, we destroy the object. Destroy(pooledStream); } } ///Attempts to return a PooledStream to the pool ////// private void PutNew(PooledStream pooledStream) { GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew", "#"+ValidationHelper.HashString(pooledStream)); GlobalLog.Assert(null != pooledStream, "Why are we adding a null object to the pool?"); GlobalLog.Assert(pooledStream.CanBePooled, "Non-poolable object in pool."); m_StackNew.Push(pooledStream); Semaphore.ReleaseSemaphore(); GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew"); } ///Places a new/reusable stream in the new stack of the pool ////// private bool ReclaimEmancipatedObjects() { bool emancipatedObjectFound = false; GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects"); lock(m_ObjectList.SyncRoot) { object[] objectList = m_ObjectList.ToArray(); if (null != objectList) { for (int i = 0; i < objectList.Length; ++i) { PooledStream pooledStream = (PooledStream) objectList[i]; if (null != pooledStream) { bool locked = false; try { locked = Monitor.TryEnter(pooledStream); if (locked) { if (pooledStream.IsEmancipated) { GlobalLog.Print("EmancipatedObject pooledStream#"+ValidationHelper.HashString(pooledStream)); PutConnection(pooledStream, null, Timeout.Infinite); emancipatedObjectFound = true; } } } finally { if (locked) Monitor.Exit(pooledStream); } } } } } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects",emancipatedObjectFound.ToString()); return emancipatedObjectFound; } ///Reclaim any pooled Streams that have seen their users/WebRequests GCed away ////// private PooledStream UserCreateRequest() { // called by user when they were not able to obtain a free object but // instead obtained creation mutex GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest"); PooledStream pooledStream = null; if (!ErrorOccurred) { if (Count < MaxPoolSize || 0 == MaxPoolSize) { // If we have an odd number of total objects, reclaim any dead objects. // If we did not find any objects to reclaim, create a new one. // if ((Count & 0x1) == 0x1 || !ReclaimEmancipatedObjects()) pooledStream = Create(m_CreateConnectionCallback); } } GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest", ValidationHelper.HashString(pooledStream)); return pooledStream; } } ///Creates a new PooledStream is allowable ////// sealed internal class InterlockedStack { private readonly Stack _stack = new Stack(); private int _count; #if DEBUG private readonly Hashtable doublepush = new Hashtable(); #endif internal InterlockedStack() { } internal void Push(Object pooledStream) { GlobalLog.Assert(null != pooledStream, "push null"); if (null == pooledStream) { throw new ArgumentNullException("pooledStream"); } lock(_stack.SyncRoot) { #if DEBUG GlobalLog.Assert(null == doublepush[pooledStream], "object already in stack"); doublepush[pooledStream] = _stack.Count; #endif _stack.Push(pooledStream); #if DEBUG GlobalLog.Assert(_count+1 == _stack.Count, "push count mishandle"); #endif _count = _stack.Count; } } internal Object Pop() { lock(_stack.SyncRoot) { object pooledStream = null; if (0 <_stack.Count) { pooledStream = _stack.Pop(); #if DEBUG GlobalLog.Assert(_count-1 == _stack.Count, "pop count mishandle"); doublepush.Remove(pooledStream); #endif _count = _stack.Count; } return pooledStream; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.Used to Pool streams in a thread safe manner ///
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TabPageDesigner.cs
- ToolStripDropDownMenu.cs
- JsonDeserializer.cs
- InputScope.cs
- ImplicitInputBrush.cs
- SqlDependency.cs
- Region.cs
- UserControlAutomationPeer.cs
- BaseServiceProvider.cs
- LinearGradientBrush.cs
- ExpressionDumper.cs
- DiscardableAttribute.cs
- WindowsRebar.cs
- DynamicContractTypeBuilder.cs
- TimeSpanOrInfiniteValidator.cs
- GroupBoxAutomationPeer.cs
- Visual3DCollection.cs
- XmlAutoDetectWriter.cs
- DataSourceSelectArguments.cs
- ScalarOps.cs
- ProfileParameter.cs
- SafeFileMappingHandle.cs
- SplayTreeNode.cs
- WebPartZoneCollection.cs
- COM2EnumConverter.cs
- XdrBuilder.cs
- TablePattern.cs
- XamlStream.cs
- WebBrowserProgressChangedEventHandler.cs
- ToolboxItemAttribute.cs
- SqlNodeAnnotation.cs
- ThemeableAttribute.cs
- HighContrastHelper.cs
- ObjectDataSourceStatusEventArgs.cs
- DataGridColumnHeader.cs
- SocketSettings.cs
- CatalogZone.cs
- LongValidator.cs
- SspiNegotiationTokenAuthenticator.cs
- ValidationError.cs
- Cursors.cs
- NavigationHelper.cs
- ControlOperationInvoker.cs
- DeleteIndexBinder.cs
- WebPartMenuStyle.cs
- NCryptSafeHandles.cs
- RoutedUICommand.cs
- webproxy.cs
- HwndHostAutomationPeer.cs
- ManagedIStream.cs
- SkewTransform.cs
- NavigationService.cs
- RequestResizeEvent.cs
- DataServiceEntityAttribute.cs
- WrapPanel.cs
- FontEmbeddingManager.cs
- TextTreeRootNode.cs
- ClientFormsAuthenticationCredentials.cs
- FormatVersion.cs
- FullTextState.cs
- ErrorInfoXmlDocument.cs
- GPPOINT.cs
- ThreadStaticAttribute.cs
- __Filters.cs
- WebRequestModuleElement.cs
- AnnotationHighlightLayer.cs
- DynamicILGenerator.cs
- MenuCommand.cs
- Page.cs
- GroupBox.cs
- UIElement3D.cs
- ImageBrush.cs
- IndentedTextWriter.cs
- StructuralObject.cs
- DataBindEngine.cs
- DrawingContextWalker.cs
- MatcherBuilder.cs
- GenericTypeParameterBuilder.cs
- DrawTreeNodeEventArgs.cs
- FileSystemWatcher.cs
- DataGridViewCellMouseEventArgs.cs
- SqlInternalConnectionTds.cs
- SpellerInterop.cs
- AnnotationAdorner.cs
- LineBreakRecord.cs
- XmlNavigatorFilter.cs
- MemberExpressionHelper.cs
- EnvironmentPermission.cs
- WebBrowserNavigatingEventHandler.cs
- CompilerWrapper.cs
- VectorAnimation.cs
- CustomWebEventKey.cs
- IndexingContentUnit.cs
- CurrentTimeZone.cs
- ExtendedProperty.cs
- LockCookie.cs
- DiffuseMaterial.cs
- CqlWriter.cs
- ReceiveCompletedEventArgs.cs
- EntityDataSourceViewSchema.cs