webeventbuffer.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ FXUpdate3074 / FXUpdate3074 / 1.1 / DEVDIV / depot / DevDiv / releases / whidbey / QFE / ndp / fx / src / xsp / System / Web / Management / webeventbuffer.cs / 4 / webeventbuffer.cs

                            //------------------------------------------------------------------------------ 
// 
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// 
//----------------------------------------------------------------------------- 

namespace System.Web.Management { 
    using System.Configuration; 
    using System.Web.Configuration;
    using System.Configuration.Provider; 
    using System.Collections.Specialized;
    using System.Collections;
    using System.Web.Util;
    using System.Web.Mail; 
    using System.Globalization;
    using System.Xml; 
    using System.Threading; 
    using System.Web.Hosting;
    using System.Security.Permissions; 

    public enum EventNotificationType
    {
        // regularly scheduled notification 
        Regular,
 
        // urgent notification 
        Urgent,
 
        // notification triggered by a user requested flush
        Flush,

        // Notification fired when buffer=false 
        Unbuffered,
    } 
 
    internal enum FlushCallReason {
        UrgentFlushThresholdExceeded, 
        Timer,
        StaticFlush
    }
 
    [AspNetHostingPermission(SecurityAction.LinkDemand, Level=AspNetHostingPermissionLevel.Minimal)]
    public sealed class WebEventBufferFlushInfo { 
        WebBaseEventCollection  _events; 
        DateTime                _lastNotification;
        int                     _eventsDiscardedSinceLastNotification; 
        int                     _eventsInBuffer;
        int                     _notificationSequence;
        EventNotificationType   _notificationType;
 
        internal WebEventBufferFlushInfo(  WebBaseEventCollection events,
                                            EventNotificationType notificationType, 
                                            int notificationSequence, 
                                            DateTime lastNotification,
                                            int eventsDiscardedSinceLastNotification, 
                                            int eventsInBuffer) {
            _events = events;
            _notificationType = notificationType;
            _notificationSequence = notificationSequence; 
            _lastNotification = lastNotification;
            _eventsDiscardedSinceLastNotification = eventsDiscardedSinceLastNotification; 
            _eventsInBuffer = eventsInBuffer; 
        }
 
        public WebBaseEventCollection  Events {
            get { return _events; }
        }
 
        public DateTime LastNotificationUtc {
            get { return _lastNotification; } 
        } 

        public int EventsDiscardedSinceLastNotification { 
            get { return _eventsDiscardedSinceLastNotification; }
        }

        public int EventsInBuffer { 
            get { return _eventsInBuffer; }
        } 
 
        public int NotificationSequence {
            get { return _notificationSequence; } 
        }

        public EventNotificationType NotificationType {
            get { return _notificationType; } 
        }
 
    } 

    internal delegate void WebEventBufferFlushCallback(WebEventBufferFlushInfo flushInfo); 

    internal sealed class WebEventBuffer {

        static long Infinite = Int64.MaxValue; 

        long        _burstWaitTimeMs = 2 * 1000; 
 
        BufferedWebEventProvider    _provider;
 
        long        _regularFlushIntervalMs;
        int         _urgentFlushThreshold;
        int         _maxBufferSize;
        int         _maxFlushSize; 
        long        _urgentFlushIntervalMs;
        int         _maxBufferThreads; 
 
        Queue       _buffer = null;
        Timer       _timer; 
        DateTime    _lastFlushTime = DateTime.MinValue;
        DateTime    _lastScheduledFlushTime = DateTime.MinValue;
        DateTime    _lastAdd = DateTime.MinValue;
        DateTime    _startTime = DateTime.MinValue; 
        bool        _urgentFlushScheduled;
        int         _discardedSinceLastFlush = 0; 
        int         _threadsInFlush = 0; 
        int         _notificationSequence = 0;
        bool        _regularTimeoutUsed; 

#if DBG
        DateTime    _nextFlush = DateTime.MinValue;
        DateTime    _lastRegularFlush = DateTime.MinValue; 
        DateTime    _lastUrgentFlush = DateTime.MinValue;
        int         _totalAdded = 0; 
        int         _totalFlushed = 0; 
        int         _totalAbandoned = 0;
#endif 

        WebEventBufferFlushCallback _flushCallback;

        internal WebEventBuffer(BufferedWebEventProvider provider, string bufferMode, 
                        WebEventBufferFlushCallback callback) {
            Debug.Assert(callback != null, "callback != null"); 
 
            _provider = provider;
 
            HealthMonitoringSection section = RuntimeConfig.GetAppLKGConfig().HealthMonitoring;

            BufferModesCollection bufferModes = section.BufferModes;
 
            BufferModeSettings bufferModeInfo = bufferModes[bufferMode];
            if (bufferModeInfo == null) { 
                throw new ConfigurationErrorsException(SR.GetString(SR.Health_mon_buffer_mode_not_found, bufferMode)); 
            }
 
            if (bufferModeInfo.RegularFlushInterval == TimeSpan.MaxValue) {
                _regularFlushIntervalMs = Infinite;
            }
            else { 
                try {
                    _regularFlushIntervalMs = (long)bufferModeInfo.RegularFlushInterval.TotalMilliseconds; 
                } 
                catch (OverflowException) {
                    _regularFlushIntervalMs = Infinite; 
                }
            }

            if (bufferModeInfo.UrgentFlushInterval == TimeSpan.MaxValue) { 
                _urgentFlushIntervalMs = Infinite;
            } 
            else { 
                try {
                    _urgentFlushIntervalMs = (long)bufferModeInfo.UrgentFlushInterval.TotalMilliseconds; 
                }
                catch (OverflowException) {
                    _urgentFlushIntervalMs = Infinite;
                } 
            }
 
            _urgentFlushThreshold = bufferModeInfo.UrgentFlushThreshold; 
            _maxBufferSize = bufferModeInfo.MaxBufferSize;
            _maxFlushSize = bufferModeInfo.MaxFlushSize; 
            _maxBufferThreads = bufferModeInfo.MaxBufferThreads;

            _burstWaitTimeMs = Math.Min(_burstWaitTimeMs, _urgentFlushIntervalMs);
 
            _flushCallback = callback;
 
            _buffer = new Queue(); 

            if (_regularFlushIntervalMs != Infinite) { 
                _startTime = DateTime.UtcNow;
                _regularTimeoutUsed = true;
                _urgentFlushScheduled = false;
                SetTimer(GetNextRegularFlushDueTimeInMs()); 
            }
 
            Debug.Trace("WebEventBuffer", 
                        "\n_regularFlushIntervalMs=" + _regularFlushIntervalMs +
                        "\n_urgentFlushThreshold=" + _urgentFlushThreshold + 
                        "\n_maxBufferSize=" + _maxBufferSize +
                        "\n_maxFlushSize=" + _maxFlushSize +
                        "\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);
        } 

        void FlushTimerCallback(object state) { 
            Flush(_maxFlushSize, FlushCallReason.Timer); 
        }
 
        //
        // If we're in notification mode, meaning urgentFlushThreshold="1", we'll flush
        // as soon as there's an event in the buffer.
        // 
        // For example, if bufferMode == "notification", we have this setting:
        //  
        // 
        // The ideal situation is that we have events coming in regularly,
        // and we flush (max 20 events at a time), wait for _urgentFlushIntervalMs (1 minute),
        // then flush the buffer, then wait 1 minute, then flush, and so on and on.
        // 
        // However, there is a scenario where there's been no event coming in, and suddenly
        // a burst of events (e.g. 20) arrive. If we flush immediately when the 1st event comes in, 
        // we then have to wait for 1 minute before we can flush the remaining 19 events. 
        //
        // To solve this problem, we demand that if we're in notification mode, and 
        // we just added an event to an empty buffer, then we may anticipate a burst
        // by waiting _burstWaitTimeMs amount of time (2s).
        //
        // But how long does a buffer needs to be empty before we 

 
 

        bool AnticipateBurst(DateTime now) { 
            // Please
            return _urgentFlushThreshold == 1 &&    // we're in notification mode
                    _buffer.Count == 1 &&           // we just added an event to an empty buffer
                    ((now - _lastAdd).TotalMilliseconds) >= _urgentFlushIntervalMs; 
        }
 
        long GetNextRegularFlushDueTimeInMs() { 
            long   nextRegularFlushFromStartTime;
            long   nowFromStartTime; 
            long   regularFlushIntervalms = _regularFlushIntervalMs;

            // Need to calculate in milliseconds in order to avoid time shift due to round-down
            if (_regularFlushIntervalMs == Infinite) { 
                return Infinite;
            } 
 
            DateTime    now = DateTime.UtcNow;
            nowFromStartTime = (long)((now - _startTime).TotalMilliseconds); 

            // For some unknown reason the Timer may fire prematurely (usually less than 50ms).  This will bring
            // us into a situation where the timer fired just tens of milliseconds before the originally planned
            // fire time, and this method will return a due time == tens of milliseconds. 
            // To workaround this problem, I added 499 ms when doing the calculation to compensate for a
            // premature firing. 
            nextRegularFlushFromStartTime = ((nowFromStartTime + regularFlushIntervalms + 499) / regularFlushIntervalms) * regularFlushIntervalms; 

            Debug.Assert(nextRegularFlushFromStartTime >= nowFromStartTime); 

            return nextRegularFlushFromStartTime - nowFromStartTime;
        }
 
        void SetTimer(long waitTimeMs) {
            if (_timer == null) { 
                _timer = new System.Threading.Timer(new TimerCallback(this.FlushTimerCallback), 
                                                null, waitTimeMs, Timeout.Infinite);
            } 
            else {
                _timer.Change(waitTimeMs, Timeout.Infinite);
            }
 
#if DBG
            _nextFlush = DateTime.UtcNow.AddMilliseconds(waitTimeMs); 
#endif 
        }
 
        // This method can be called by the timer, or by AddEvent.
        //
        // Basic design:
        // - We have one timer, and one buffer. 
        // - We flush periodically every _regularFlushIntervalMs ms
        // - But if # of items in buffer has reached _urgentFlushThreshold, we will flush more frequently, 
        //   but at most once every _urgentFlushIntervalMs ms.  However, these urgent flushes will not 
        //   prevent the regular flush from happening.
        // - We never flush synchronously, meaning if we're called by AddEvent and decide to flush 
        //   because we've reached the _urgentFlushThreshold, we will still use the timer thread
        //   to flush the buffer.
        // - At any point only a maximum of _maxBufferThreads threads can be flushing.  If exceeded,
        //   we will delay a flush. 
        //
        // 
 
        // For example, let's say we have this setting:
        // "1 minute urgentFlushInterval and 5 minute regularFlushInterval" 
        //
        // Assume regular flush timer starts at 10:00am.  It means regular
        // flush will happen at 10:05am, 10:10am, 10:15am, and so on,
        // regardless of when urgent flush happens. 
        //
        // An "urgent flush" happens whenever urgentFlushThreshold is reached. 
        // However, when we schedule an "urgent flush", we ensure that the time 
        // between an urgent flush and the last flush (no matter it's urgent or
        // regular) will be at least urgentFlushInterval. 
        //
        // One interesting case here.  Assume at 10:49:30 we had an urgent
        // flush, but the # of events left is still above urgentFlushThreshold.
        // You may think we'll schedule the next urgent flush at 10:50:30 
        // (urgentFlushInterval == 1 min).  However, because we know we will
        // have a regular flush at 10:50:00, we won't schedule the next urgent 
        // flush.  Instead, during the regular flush at 10:50:00 happens, we'll 
        //
 

        internal void Flush(int max, FlushCallReason reason) {
            WebBaseEvent[]  events = null;
            DateTime    nowUtc = DateTime.UtcNow; 
            long        waitTime = 0;
            DateTime    lastFlushTime = DateTime.MaxValue; 
            int         discardedSinceLastFlush = -1; 
            int         eventsInBuffer = -1;
            int         toFlush = 0; 
            EventNotificationType   notificationType = EventNotificationType.Regular;

            // By default, this call will flush, but will not schedule the next flush.
            bool        flushThisTime = true; 
            bool        scheduleNextFlush = false;
            bool        nextFlushIsUrgent = false; 
 
            lock(_buffer) {
                Debug.Assert(max > 0, "max > 0"); 

                if (_buffer.Count == 0) {
                    // We have nothing in the buffer.  Don't flush this time.
                    Debug.Trace("WebEventBufferExtended", "Flush: buffer is empty, don't flush"); 
                    flushThisTime = false;
                } 
 
                switch (reason) {
                case FlushCallReason.StaticFlush: 
                    // It means somebody calls provider.Flush()
                    break;

                case FlushCallReason.Timer: 
                    // It's a callback from a timer.  We will schedule the next regular flush if needed.
 
                    if (_regularFlushIntervalMs != Infinite) { 
                        scheduleNextFlush = true;
                        waitTime = GetNextRegularFlushDueTimeInMs(); 
                    }
                    break;

                case FlushCallReason.UrgentFlushThresholdExceeded: 
                    // It means this method is called by AddEvent because the urgent flush threshold is reached.
 
                    // If an urgent flush has already been scheduled by someone else, we don't need to duplicate the 
                    // effort.  Just return.
                    if (_urgentFlushScheduled) { 
                        return;
                    }

                    // Flush triggered by AddEvent isn't synchronous, so we won't flush this time, but will 
                    // schedule an urgent flush instead.
                    flushThisTime = false; 
                    scheduleNextFlush = true; 
                    nextFlushIsUrgent = true;
 
                    // Calculate how long we have to wait when scheduling the flush
                    if (AnticipateBurst(nowUtc)) {
                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Waiting for burst");
                        waitTime = _burstWaitTimeMs; 
                    }
                    else { 
                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Schedule an immediate flush"); 
                        waitTime = 0;
                    } 

                    // Have to wait longer because of _urgentFlushIntervalMs
                    long    msSinceLastScheduledFlush = (long)(nowUtc - _lastScheduledFlushTime).TotalMilliseconds;
                    if (msSinceLastScheduledFlush + waitTime < _urgentFlushIntervalMs ) { 

                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Have to wait longer because of _urgentFlushIntervalMs."); 
                        waitTime = _urgentFlushIntervalMs - msSinceLastScheduledFlush; 
                    }
 
                    Debug.Trace("WebEventBuffer", "Wait time=" + waitTime +
                        "; nowUtc=" + PrintTime(nowUtc) +
                        "; _lastScheduledFlushTime=" + PrintTime(_lastScheduledFlushTime) +
                        "; _urgentFlushIntervalMs=" + _urgentFlushIntervalMs); 

                    break; 
                } 

                Debug.Trace("WebEventBuffer", "Flush called: max=" + max + 
                    "; reason=" + reason);

                if (flushThisTime) {
                    // Check if we've exceeded the # of flushing threads.  If so, 
                    // don't flush this time.
 
                    if (_threadsInFlush >= _maxBufferThreads) { 
                        // Won't set flushThisTime to false because we depend on
                        // the logic inside the next "if" block to schedule the 
                        // next urgent flush as needed.
                        toFlush = 0;
                    }
                    else { 
                        toFlush = Math.Min(_buffer.Count, max);
                    } 
                } 

#if DBG 
                DebugUpdateStats(flushThisTime, nowUtc, toFlush, reason);
#endif

                if (flushThisTime) { 
                    Debug.Assert(reason != FlushCallReason.UrgentFlushThresholdExceeded, "reason != FlushCallReason.UrgentFlushThresholdExceeded");
 
                    if (toFlush > 0) { 
                        // Move the to-be-flushed events to an array
                        events = new WebBaseEvent[toFlush]; 

                        for (int i = 0; i < toFlush; i++) {
                            events[i] = (WebBaseEvent)_buffer.Dequeue();
                        } 

                        lastFlushTime = _lastFlushTime; 
 
                        // Update _lastFlushTime and _lastScheduledFlushTime.
                        // These information are used when Flush is called the next time. 
                        _lastFlushTime = nowUtc;
                        if (reason == FlushCallReason.Timer) {
                            _lastScheduledFlushTime = nowUtc;
                        } 

                        discardedSinceLastFlush = _discardedSinceLastFlush; 
                        _discardedSinceLastFlush = 0; 

                        if (reason == FlushCallReason.StaticFlush) { 
                            notificationType = EventNotificationType.Flush;
                        }
                        else {
                            Debug.Assert(!(!_regularTimeoutUsed && !_urgentFlushScheduled), 
                                "It's impossible to have a non-regular flush and yet the flush isn't urgent");
 
                            notificationType = _regularTimeoutUsed ? 
                                               EventNotificationType.Regular :
                                               EventNotificationType.Urgent; 
                        }
                    }

                    eventsInBuffer = _buffer.Count; 

                    // If we still have at least _urgentFlushThreshold left, set timer 
                    // to flush asap. 
                    if (eventsInBuffer >= _urgentFlushThreshold) {
                        Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events, but still have at least _urgentFlushThreshold left. Schedule a flush"); 
                        scheduleNextFlush = true;
                        nextFlushIsUrgent = true;
                        waitTime = _urgentFlushIntervalMs;
                    } 
                    else {
                        Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events"); 
                    } 
                }
 
                // We are done moving the flushed events to the 'events' array.
                // Now schedule the next flush if needed.

                _urgentFlushScheduled = false; 

                if (scheduleNextFlush) { 
                    if (nextFlushIsUrgent) { 
                        long nextRegular = GetNextRegularFlushDueTimeInMs();
 
                        // If next regular flush is closer than next urgent flush,
                        // use regular flush instead.
                        if (nextRegular < waitTime) {
                            Debug.Trace("WebEventBuffer", "Switch to use regular timeout"); 
                            waitTime = nextRegular;
                            _regularTimeoutUsed = true; 
                        } 
                        else {
                            _regularTimeoutUsed = false; 
                        }
                    }
                    else {
                        _regularTimeoutUsed = true; 
                    }
 
                    SetTimer(waitTime); 
                    _urgentFlushScheduled = nextFlushIsUrgent;
#if DBG 
                    Debug.Trace("WebEventBuffer", "Flush: Registered for a flush.  Waittime = " + waitTime + "ms" +
                        "; _nextFlush=" + PrintTime(_nextFlush) +
                        "; _urgentFlushScheduled=" + _urgentFlushScheduled);
#endif 

                } 
 
                // Cleanup.  If we are called by a timer callback, but we haven't scheduled for the next
                // one (can only happen if _regularFlushIntervalMs == Infinite), we should dispose the timer 
                if (reason == FlushCallReason.Timer && !scheduleNextFlush) {
                    Debug.Trace("WebEventBuffer", "Flush: Disposing the timer");
                    Debug.Assert(_regularFlushIntervalMs == Infinite, "We can dispose the timer only if _regularFlushIntervalMs == Infinite");
                    ((IDisposable)_timer).Dispose(); 
                    _timer = null;
                    _urgentFlushScheduled = false; 
                } 

                // We want to increment the thread count within the lock to ensure we don't let too many threads in 
                if (events != null) {
                    Interlocked.Increment(ref _threadsInFlush);
                }
            } // Release lock 

            // Now call the providers to flush the events 
            if (events != null) { 
                Debug.Assert(lastFlushTime != DateTime.MaxValue, "lastFlushTime != DateTime.MaxValue");
                Debug.Assert(discardedSinceLastFlush != -1, "discardedSinceLastFlush != -1"); 
                Debug.Assert(eventsInBuffer != -1, "eventsInBuffer != -1");

                Debug.Trace("WebEventBufferSummary", "_threadsInFlush=" + _threadsInFlush);
 
                using (new ApplicationImpersonationContext()) {
                    try { 
                        WebEventBufferFlushInfo flushInfo = new WebEventBufferFlushInfo( 
                                                                new WebBaseEventCollection(events),
                                                                notificationType, 
                                                                Interlocked.Increment(ref _notificationSequence),
                                                                lastFlushTime,
                                                                discardedSinceLastFlush,
                                                                eventsInBuffer); 

                        _flushCallback(flushInfo); 
                    } 
                    catch (Exception e) {
                        try { 
                            _provider.LogException(e);
                        }
                        catch {
                            // Ignore all errors 
                        }
                    } 
                    catch { // non compliant exceptions are caught and logged as Unknown 
                        try {
                            _provider.LogException(new Exception(SR.GetString(SR.Provider_Error))); 
                        }
                        catch {
                            // Ignore all errors
                        } 
                    }
                } 
 
                Interlocked.Decrement(ref _threadsInFlush);
            } 
        }

        internal void AddEvent(WebBaseEvent webEvent) {
            lock(_buffer) { 
#if DBG
                _totalAdded++; 
#endif 
                // If we have filled up the buffer, remove items using FIFO order.
                if (_buffer.Count == _maxBufferSize) { 
                    Debug.Trace("WebEventBuffer", "Buffer is full.  Need to remove one from the tail");
                    _buffer.Dequeue();
                    _discardedSinceLastFlush++;
#if DBG 
                    _totalAbandoned++;
#endif 
                } 

                _buffer.Enqueue(webEvent); 

                // If we have at least _urgentFlushThreshold, flush.  Please
                if (_buffer.Count >= _urgentFlushThreshold) {
                    Flush(_maxFlushSize, FlushCallReason.UrgentFlushThresholdExceeded); 
                }
 
                // Note that Flush uses _lastAdd, which is the time an event (not including this one) 
                // was last added.  That's why we call it after calling Flush.
                _lastAdd = DateTime.UtcNow; 
            }   // Release the lock
        }

        internal void Shutdown() { 
            if (_timer != null) {
                _timer.Dispose(); 
                _timer = null; 
            }
        } 

        string PrintTime(DateTime t) {
            return t.ToString("T", DateTimeFormatInfo.InvariantInfo) + "." + t.Millisecond.ToString("d03", CultureInfo.InvariantCulture);
        } 

 
#if DBG 
        void DebugUpdateStats(bool flushThisTime, DateTime now, int toFlush, FlushCallReason reason) {
            Debug.Assert(_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count, 
                    "_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count");

            _totalFlushed += toFlush;
 
            if (reason != FlushCallReason.Timer) {
                return; 
            } 

            Debug.Trace("WebEventBufferSummary", 
                "_Added=" + _totalAdded + "; deleted=" + _totalAbandoned +
                "; Flushed=" + _totalFlushed + "; buffer=" + _buffer.Count +
                "; toFlush=" + toFlush +
                "; lastFlush=" + PrintTime(_lastRegularFlush) + 
                "; lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
                "; GetRegFlushDueTime=" + GetNextRegularFlushDueTimeInMs() + 
                "; toFlush=" + toFlush + 
                "; now=" + PrintTime(now));
 
            if (!_regularTimeoutUsed) {
                if (!flushThisTime) {
                    return;
                } 

                Debug.Assert((now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs, 
                    "(now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs" + 
                    "\n_lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
                    "\nnow=" + PrintTime(now) + 
                    "\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);

                _lastUrgentFlush = now;
            } 
            else {
                /* 
                // It's a regular callback 
                if (_lastRegularFlush != DateTime.MinValue) {
                    Debug.Assert(Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000, 
                        "Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000" +
                        "\n_lastRegularFlush=" + PrintTime(_lastRegularFlush) +
                        "\nnow=" + PrintTime(now) +
                        "\n_regularFlushIntervalMs=" + _regularFlushIntervalMs); 
                }
                */ 
 
                _lastRegularFlush = now;
            } 
        }

#endif
    } 
}
 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------------------------ 
// 
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// 
//----------------------------------------------------------------------------- 

namespace System.Web.Management { 
    using System.Configuration; 
    using System.Web.Configuration;
    using System.Configuration.Provider; 
    using System.Collections.Specialized;
    using System.Collections;
    using System.Web.Util;
    using System.Web.Mail; 
    using System.Globalization;
    using System.Xml; 
    using System.Threading; 
    using System.Web.Hosting;
    using System.Security.Permissions; 

    public enum EventNotificationType
    {
        // regularly scheduled notification 
        Regular,
 
        // urgent notification 
        Urgent,
 
        // notification triggered by a user requested flush
        Flush,

        // Notification fired when buffer=false 
        Unbuffered,
    } 
 
    internal enum FlushCallReason {
        UrgentFlushThresholdExceeded, 
        Timer,
        StaticFlush
    }
 
    [AspNetHostingPermission(SecurityAction.LinkDemand, Level=AspNetHostingPermissionLevel.Minimal)]
    public sealed class WebEventBufferFlushInfo { 
        WebBaseEventCollection  _events; 
        DateTime                _lastNotification;
        int                     _eventsDiscardedSinceLastNotification; 
        int                     _eventsInBuffer;
        int                     _notificationSequence;
        EventNotificationType   _notificationType;
 
        internal WebEventBufferFlushInfo(  WebBaseEventCollection events,
                                            EventNotificationType notificationType, 
                                            int notificationSequence, 
                                            DateTime lastNotification,
                                            int eventsDiscardedSinceLastNotification, 
                                            int eventsInBuffer) {
            _events = events;
            _notificationType = notificationType;
            _notificationSequence = notificationSequence; 
            _lastNotification = lastNotification;
            _eventsDiscardedSinceLastNotification = eventsDiscardedSinceLastNotification; 
            _eventsInBuffer = eventsInBuffer; 
        }
 
        public WebBaseEventCollection  Events {
            get { return _events; }
        }
 
        public DateTime LastNotificationUtc {
            get { return _lastNotification; } 
        } 

        public int EventsDiscardedSinceLastNotification { 
            get { return _eventsDiscardedSinceLastNotification; }
        }

        public int EventsInBuffer { 
            get { return _eventsInBuffer; }
        } 
 
        public int NotificationSequence {
            get { return _notificationSequence; } 
        }

        public EventNotificationType NotificationType {
            get { return _notificationType; } 
        }
 
    } 

    internal delegate void WebEventBufferFlushCallback(WebEventBufferFlushInfo flushInfo); 

    internal sealed class WebEventBuffer {

        static long Infinite = Int64.MaxValue; 

        long        _burstWaitTimeMs = 2 * 1000; 
 
        BufferedWebEventProvider    _provider;
 
        long        _regularFlushIntervalMs;
        int         _urgentFlushThreshold;
        int         _maxBufferSize;
        int         _maxFlushSize; 
        long        _urgentFlushIntervalMs;
        int         _maxBufferThreads; 
 
        Queue       _buffer = null;
        Timer       _timer; 
        DateTime    _lastFlushTime = DateTime.MinValue;
        DateTime    _lastScheduledFlushTime = DateTime.MinValue;
        DateTime    _lastAdd = DateTime.MinValue;
        DateTime    _startTime = DateTime.MinValue; 
        bool        _urgentFlushScheduled;
        int         _discardedSinceLastFlush = 0; 
        int         _threadsInFlush = 0; 
        int         _notificationSequence = 0;
        bool        _regularTimeoutUsed; 

#if DBG
        DateTime    _nextFlush = DateTime.MinValue;
        DateTime    _lastRegularFlush = DateTime.MinValue; 
        DateTime    _lastUrgentFlush = DateTime.MinValue;
        int         _totalAdded = 0; 
        int         _totalFlushed = 0; 
        int         _totalAbandoned = 0;
#endif 

        WebEventBufferFlushCallback _flushCallback;

        internal WebEventBuffer(BufferedWebEventProvider provider, string bufferMode, 
                        WebEventBufferFlushCallback callback) {
            Debug.Assert(callback != null, "callback != null"); 
 
            _provider = provider;
 
            HealthMonitoringSection section = RuntimeConfig.GetAppLKGConfig().HealthMonitoring;

            BufferModesCollection bufferModes = section.BufferModes;
 
            BufferModeSettings bufferModeInfo = bufferModes[bufferMode];
            if (bufferModeInfo == null) { 
                throw new ConfigurationErrorsException(SR.GetString(SR.Health_mon_buffer_mode_not_found, bufferMode)); 
            }
 
            if (bufferModeInfo.RegularFlushInterval == TimeSpan.MaxValue) {
                _regularFlushIntervalMs = Infinite;
            }
            else { 
                try {
                    _regularFlushIntervalMs = (long)bufferModeInfo.RegularFlushInterval.TotalMilliseconds; 
                } 
                catch (OverflowException) {
                    _regularFlushIntervalMs = Infinite; 
                }
            }

            if (bufferModeInfo.UrgentFlushInterval == TimeSpan.MaxValue) { 
                _urgentFlushIntervalMs = Infinite;
            } 
            else { 
                try {
                    _urgentFlushIntervalMs = (long)bufferModeInfo.UrgentFlushInterval.TotalMilliseconds; 
                }
                catch (OverflowException) {
                    _urgentFlushIntervalMs = Infinite;
                } 
            }
 
            _urgentFlushThreshold = bufferModeInfo.UrgentFlushThreshold; 
            _maxBufferSize = bufferModeInfo.MaxBufferSize;
            _maxFlushSize = bufferModeInfo.MaxFlushSize; 
            _maxBufferThreads = bufferModeInfo.MaxBufferThreads;

            _burstWaitTimeMs = Math.Min(_burstWaitTimeMs, _urgentFlushIntervalMs);
 
            _flushCallback = callback;
 
            _buffer = new Queue(); 

            if (_regularFlushIntervalMs != Infinite) { 
                _startTime = DateTime.UtcNow;
                _regularTimeoutUsed = true;
                _urgentFlushScheduled = false;
                SetTimer(GetNextRegularFlushDueTimeInMs()); 
            }
 
            Debug.Trace("WebEventBuffer", 
                        "\n_regularFlushIntervalMs=" + _regularFlushIntervalMs +
                        "\n_urgentFlushThreshold=" + _urgentFlushThreshold + 
                        "\n_maxBufferSize=" + _maxBufferSize +
                        "\n_maxFlushSize=" + _maxFlushSize +
                        "\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);
        } 

        void FlushTimerCallback(object state) { 
            Flush(_maxFlushSize, FlushCallReason.Timer); 
        }
 
        //
        // If we're in notification mode, meaning urgentFlushThreshold="1", we'll flush
        // as soon as there's an event in the buffer.
        // 
        // For example, if bufferMode == "notification", we have this setting:
        //  
        // 
        // The ideal situation is that we have events coming in regularly,
        // and we flush (max 20 events at a time), wait for _urgentFlushIntervalMs (1 minute),
        // then flush the buffer, then wait 1 minute, then flush, and so on and on.
        // 
        // However, there is a scenario where there's been no event coming in, and suddenly
        // a burst of events (e.g. 20) arrive. If we flush immediately when the 1st event comes in, 
        // we then have to wait for 1 minute before we can flush the remaining 19 events. 
        //
        // To solve this problem, we demand that if we're in notification mode, and 
        // we just added an event to an empty buffer, then we may anticipate a burst
        // by waiting _burstWaitTimeMs amount of time (2s).
        //
        // But how long does a buffer needs to be empty before we 

 
 

        bool AnticipateBurst(DateTime now) { 
            // Please
            return _urgentFlushThreshold == 1 &&    // we're in notification mode
                    _buffer.Count == 1 &&           // we just added an event to an empty buffer
                    ((now - _lastAdd).TotalMilliseconds) >= _urgentFlushIntervalMs; 
        }
 
        long GetNextRegularFlushDueTimeInMs() { 
            long   nextRegularFlushFromStartTime;
            long   nowFromStartTime; 
            long   regularFlushIntervalms = _regularFlushIntervalMs;

            // Need to calculate in milliseconds in order to avoid time shift due to round-down
            if (_regularFlushIntervalMs == Infinite) { 
                return Infinite;
            } 
 
            DateTime    now = DateTime.UtcNow;
            nowFromStartTime = (long)((now - _startTime).TotalMilliseconds); 

            // For some unknown reason the Timer may fire prematurely (usually less than 50ms).  This will bring
            // us into a situation where the timer fired just tens of milliseconds before the originally planned
            // fire time, and this method will return a due time == tens of milliseconds. 
            // To workaround this problem, I added 499 ms when doing the calculation to compensate for a
            // premature firing. 
            nextRegularFlushFromStartTime = ((nowFromStartTime + regularFlushIntervalms + 499) / regularFlushIntervalms) * regularFlushIntervalms; 

            Debug.Assert(nextRegularFlushFromStartTime >= nowFromStartTime); 

            return nextRegularFlushFromStartTime - nowFromStartTime;
        }
 
        void SetTimer(long waitTimeMs) {
            if (_timer == null) { 
                _timer = new System.Threading.Timer(new TimerCallback(this.FlushTimerCallback), 
                                                null, waitTimeMs, Timeout.Infinite);
            } 
            else {
                _timer.Change(waitTimeMs, Timeout.Infinite);
            }
 
#if DBG
            _nextFlush = DateTime.UtcNow.AddMilliseconds(waitTimeMs); 
#endif 
        }
 
        // This method can be called by the timer, or by AddEvent.
        //
        // Basic design:
        // - We have one timer, and one buffer. 
        // - We flush periodically every _regularFlushIntervalMs ms
        // - But if # of items in buffer has reached _urgentFlushThreshold, we will flush more frequently, 
        //   but at most once every _urgentFlushIntervalMs ms.  However, these urgent flushes will not 
        //   prevent the regular flush from happening.
        // - We never flush synchronously, meaning if we're called by AddEvent and decide to flush 
        //   because we've reached the _urgentFlushThreshold, we will still use the timer thread
        //   to flush the buffer.
        // - At any point only a maximum of _maxBufferThreads threads can be flushing.  If exceeded,
        //   we will delay a flush. 
        //
        // 
 
        // For example, let's say we have this setting:
        // "1 minute urgentFlushInterval and 5 minute regularFlushInterval" 
        //
        // Assume regular flush timer starts at 10:00am.  It means regular
        // flush will happen at 10:05am, 10:10am, 10:15am, and so on,
        // regardless of when urgent flush happens. 
        //
        // An "urgent flush" happens whenever urgentFlushThreshold is reached. 
        // However, when we schedule an "urgent flush", we ensure that the time 
        // between an urgent flush and the last flush (no matter it's urgent or
        // regular) will be at least urgentFlushInterval. 
        //
        // One interesting case here.  Assume at 10:49:30 we had an urgent
        // flush, but the # of events left is still above urgentFlushThreshold.
        // You may think we'll schedule the next urgent flush at 10:50:30 
        // (urgentFlushInterval == 1 min).  However, because we know we will
        // have a regular flush at 10:50:00, we won't schedule the next urgent 
        // flush.  Instead, during the regular flush at 10:50:00 happens, we'll 
        //
 

        internal void Flush(int max, FlushCallReason reason) {
            WebBaseEvent[]  events = null;
            DateTime    nowUtc = DateTime.UtcNow; 
            long        waitTime = 0;
            DateTime    lastFlushTime = DateTime.MaxValue; 
            int         discardedSinceLastFlush = -1; 
            int         eventsInBuffer = -1;
            int         toFlush = 0; 
            EventNotificationType   notificationType = EventNotificationType.Regular;

            // By default, this call will flush, but will not schedule the next flush.
            bool        flushThisTime = true; 
            bool        scheduleNextFlush = false;
            bool        nextFlushIsUrgent = false; 
 
            lock(_buffer) {
                Debug.Assert(max > 0, "max > 0"); 

                if (_buffer.Count == 0) {
                    // We have nothing in the buffer.  Don't flush this time.
                    Debug.Trace("WebEventBufferExtended", "Flush: buffer is empty, don't flush"); 
                    flushThisTime = false;
                } 
 
                switch (reason) {
                case FlushCallReason.StaticFlush: 
                    // It means somebody calls provider.Flush()
                    break;

                case FlushCallReason.Timer: 
                    // It's a callback from a timer.  We will schedule the next regular flush if needed.
 
                    if (_regularFlushIntervalMs != Infinite) { 
                        scheduleNextFlush = true;
                        waitTime = GetNextRegularFlushDueTimeInMs(); 
                    }
                    break;

                case FlushCallReason.UrgentFlushThresholdExceeded: 
                    // It means this method is called by AddEvent because the urgent flush threshold is reached.
 
                    // If an urgent flush has already been scheduled by someone else, we don't need to duplicate the 
                    // effort.  Just return.
                    if (_urgentFlushScheduled) { 
                        return;
                    }

                    // Flush triggered by AddEvent isn't synchronous, so we won't flush this time, but will 
                    // schedule an urgent flush instead.
                    flushThisTime = false; 
                    scheduleNextFlush = true; 
                    nextFlushIsUrgent = true;
 
                    // Calculate how long we have to wait when scheduling the flush
                    if (AnticipateBurst(nowUtc)) {
                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Waiting for burst");
                        waitTime = _burstWaitTimeMs; 
                    }
                    else { 
                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Schedule an immediate flush"); 
                        waitTime = 0;
                    } 

                    // Have to wait longer because of _urgentFlushIntervalMs
                    long    msSinceLastScheduledFlush = (long)(nowUtc - _lastScheduledFlushTime).TotalMilliseconds;
                    if (msSinceLastScheduledFlush + waitTime < _urgentFlushIntervalMs ) { 

                        Debug.Trace("WebEventBuffer", "Flush: Called by AddEvent.  Have to wait longer because of _urgentFlushIntervalMs."); 
                        waitTime = _urgentFlushIntervalMs - msSinceLastScheduledFlush; 
                    }
 
                    Debug.Trace("WebEventBuffer", "Wait time=" + waitTime +
                        "; nowUtc=" + PrintTime(nowUtc) +
                        "; _lastScheduledFlushTime=" + PrintTime(_lastScheduledFlushTime) +
                        "; _urgentFlushIntervalMs=" + _urgentFlushIntervalMs); 

                    break; 
                } 

                Debug.Trace("WebEventBuffer", "Flush called: max=" + max + 
                    "; reason=" + reason);

                if (flushThisTime) {
                    // Check if we've exceeded the # of flushing threads.  If so, 
                    // don't flush this time.
 
                    if (_threadsInFlush >= _maxBufferThreads) { 
                        // Won't set flushThisTime to false because we depend on
                        // the logic inside the next "if" block to schedule the 
                        // next urgent flush as needed.
                        toFlush = 0;
                    }
                    else { 
                        toFlush = Math.Min(_buffer.Count, max);
                    } 
                } 

#if DBG 
                DebugUpdateStats(flushThisTime, nowUtc, toFlush, reason);
#endif

                if (flushThisTime) { 
                    Debug.Assert(reason != FlushCallReason.UrgentFlushThresholdExceeded, "reason != FlushCallReason.UrgentFlushThresholdExceeded");
 
                    if (toFlush > 0) { 
                        // Move the to-be-flushed events to an array
                        events = new WebBaseEvent[toFlush]; 

                        for (int i = 0; i < toFlush; i++) {
                            events[i] = (WebBaseEvent)_buffer.Dequeue();
                        } 

                        lastFlushTime = _lastFlushTime; 
 
                        // Update _lastFlushTime and _lastScheduledFlushTime.
                        // These information are used when Flush is called the next time. 
                        _lastFlushTime = nowUtc;
                        if (reason == FlushCallReason.Timer) {
                            _lastScheduledFlushTime = nowUtc;
                        } 

                        discardedSinceLastFlush = _discardedSinceLastFlush; 
                        _discardedSinceLastFlush = 0; 

                        if (reason == FlushCallReason.StaticFlush) { 
                            notificationType = EventNotificationType.Flush;
                        }
                        else {
                            Debug.Assert(!(!_regularTimeoutUsed && !_urgentFlushScheduled), 
                                "It's impossible to have a non-regular flush and yet the flush isn't urgent");
 
                            notificationType = _regularTimeoutUsed ? 
                                               EventNotificationType.Regular :
                                               EventNotificationType.Urgent; 
                        }
                    }

                    eventsInBuffer = _buffer.Count; 

                    // If we still have at least _urgentFlushThreshold left, set timer 
                    // to flush asap. 
                    if (eventsInBuffer >= _urgentFlushThreshold) {
                        Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events, but still have at least _urgentFlushThreshold left. Schedule a flush"); 
                        scheduleNextFlush = true;
                        nextFlushIsUrgent = true;
                        waitTime = _urgentFlushIntervalMs;
                    } 
                    else {
                        Debug.Trace("WebEventBuffer", "Flush: going to flush " + toFlush + " events"); 
                    } 
                }
 
                // We are done moving the flushed events to the 'events' array.
                // Now schedule the next flush if needed.

                _urgentFlushScheduled = false; 

                if (scheduleNextFlush) { 
                    if (nextFlushIsUrgent) { 
                        long nextRegular = GetNextRegularFlushDueTimeInMs();
 
                        // If next regular flush is closer than next urgent flush,
                        // use regular flush instead.
                        if (nextRegular < waitTime) {
                            Debug.Trace("WebEventBuffer", "Switch to use regular timeout"); 
                            waitTime = nextRegular;
                            _regularTimeoutUsed = true; 
                        } 
                        else {
                            _regularTimeoutUsed = false; 
                        }
                    }
                    else {
                        _regularTimeoutUsed = true; 
                    }
 
                    SetTimer(waitTime); 
                    _urgentFlushScheduled = nextFlushIsUrgent;
#if DBG 
                    Debug.Trace("WebEventBuffer", "Flush: Registered for a flush.  Waittime = " + waitTime + "ms" +
                        "; _nextFlush=" + PrintTime(_nextFlush) +
                        "; _urgentFlushScheduled=" + _urgentFlushScheduled);
#endif 

                } 
 
                // Cleanup.  If we are called by a timer callback, but we haven't scheduled for the next
                // one (can only happen if _regularFlushIntervalMs == Infinite), we should dispose the timer 
                if (reason == FlushCallReason.Timer && !scheduleNextFlush) {
                    Debug.Trace("WebEventBuffer", "Flush: Disposing the timer");
                    Debug.Assert(_regularFlushIntervalMs == Infinite, "We can dispose the timer only if _regularFlushIntervalMs == Infinite");
                    ((IDisposable)_timer).Dispose(); 
                    _timer = null;
                    _urgentFlushScheduled = false; 
                } 

                // We want to increment the thread count within the lock to ensure we don't let too many threads in 
                if (events != null) {
                    Interlocked.Increment(ref _threadsInFlush);
                }
            } // Release lock 

            // Now call the providers to flush the events 
            if (events != null) { 
                Debug.Assert(lastFlushTime != DateTime.MaxValue, "lastFlushTime != DateTime.MaxValue");
                Debug.Assert(discardedSinceLastFlush != -1, "discardedSinceLastFlush != -1"); 
                Debug.Assert(eventsInBuffer != -1, "eventsInBuffer != -1");

                Debug.Trace("WebEventBufferSummary", "_threadsInFlush=" + _threadsInFlush);
 
                using (new ApplicationImpersonationContext()) {
                    try { 
                        WebEventBufferFlushInfo flushInfo = new WebEventBufferFlushInfo( 
                                                                new WebBaseEventCollection(events),
                                                                notificationType, 
                                                                Interlocked.Increment(ref _notificationSequence),
                                                                lastFlushTime,
                                                                discardedSinceLastFlush,
                                                                eventsInBuffer); 

                        _flushCallback(flushInfo); 
                    } 
                    catch (Exception e) {
                        try { 
                            _provider.LogException(e);
                        }
                        catch {
                            // Ignore all errors 
                        }
                    } 
                    catch { // non compliant exceptions are caught and logged as Unknown 
                        try {
                            _provider.LogException(new Exception(SR.GetString(SR.Provider_Error))); 
                        }
                        catch {
                            // Ignore all errors
                        } 
                    }
                } 
 
                Interlocked.Decrement(ref _threadsInFlush);
            } 
        }

        internal void AddEvent(WebBaseEvent webEvent) {
            lock(_buffer) { 
#if DBG
                _totalAdded++; 
#endif 
                // If we have filled up the buffer, remove items using FIFO order.
                if (_buffer.Count == _maxBufferSize) { 
                    Debug.Trace("WebEventBuffer", "Buffer is full.  Need to remove one from the tail");
                    _buffer.Dequeue();
                    _discardedSinceLastFlush++;
#if DBG 
                    _totalAbandoned++;
#endif 
                } 

                _buffer.Enqueue(webEvent); 

                // If we have at least _urgentFlushThreshold, flush.  Please
                if (_buffer.Count >= _urgentFlushThreshold) {
                    Flush(_maxFlushSize, FlushCallReason.UrgentFlushThresholdExceeded); 
                }
 
                // Note that Flush uses _lastAdd, which is the time an event (not including this one) 
                // was last added.  That's why we call it after calling Flush.
                _lastAdd = DateTime.UtcNow; 
            }   // Release the lock
        }

        internal void Shutdown() { 
            if (_timer != null) {
                _timer.Dispose(); 
                _timer = null; 
            }
        } 

        string PrintTime(DateTime t) {
            return t.ToString("T", DateTimeFormatInfo.InvariantInfo) + "." + t.Millisecond.ToString("d03", CultureInfo.InvariantCulture);
        } 

 
#if DBG 
        void DebugUpdateStats(bool flushThisTime, DateTime now, int toFlush, FlushCallReason reason) {
            Debug.Assert(_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count, 
                    "_totalAdded == _totalAbandoned + _totalFlushed + _buffer.Count");

            _totalFlushed += toFlush;
 
            if (reason != FlushCallReason.Timer) {
                return; 
            } 

            Debug.Trace("WebEventBufferSummary", 
                "_Added=" + _totalAdded + "; deleted=" + _totalAbandoned +
                "; Flushed=" + _totalFlushed + "; buffer=" + _buffer.Count +
                "; toFlush=" + toFlush +
                "; lastFlush=" + PrintTime(_lastRegularFlush) + 
                "; lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
                "; GetRegFlushDueTime=" + GetNextRegularFlushDueTimeInMs() + 
                "; toFlush=" + toFlush + 
                "; now=" + PrintTime(now));
 
            if (!_regularTimeoutUsed) {
                if (!flushThisTime) {
                    return;
                } 

                Debug.Assert((now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs, 
                    "(now - _lastUrgentFlush).TotalMilliseconds + 499 > _urgentFlushIntervalMs" + 
                    "\n_lastUrgentFlush=" + PrintTime(_lastUrgentFlush) +
                    "\nnow=" + PrintTime(now) + 
                    "\n_urgentFlushIntervalMs=" + _urgentFlushIntervalMs);

                _lastUrgentFlush = now;
            } 
            else {
                /* 
                // It's a regular callback 
                if (_lastRegularFlush != DateTime.MinValue) {
                    Debug.Assert(Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000, 
                        "Math.Abs((now - _lastRegularFlush).TotalMilliseconds - _regularFlushIntervalMs) < 2000" +
                        "\n_lastRegularFlush=" + PrintTime(_lastRegularFlush) +
                        "\nnow=" + PrintTime(now) +
                        "\n_regularFlushIntervalMs=" + _regularFlushIntervalMs); 
                }
                */ 
 
                _lastRegularFlush = now;
            } 
        }

#endif
    } 
}
 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK