AsyncStreamReader.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Services / Monitoring / system / Diagnosticts / AsyncStreamReader.cs / 1305376 / AsyncStreamReader.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
/*============================================================
** 
** Class:  AsyncStreamReader 
**
** Purpose: For reading text from streams using a particular 
** encoding in an asychronous manner used by the process class
**
**
===========================================================*/ 

 
namespace System.Diagnostics { 
    using System;
    using System.IO; 
    using System.Text;
    using System.Runtime.InteropServices;
    using System.Threading;
    using System.Collections; 

    internal delegate void UserCallBack(String data); 
 
    internal class AsyncStreamReader : IDisposable
    { 
        internal const int DefaultBufferSize = 1024;  // Byte buffer size
        private const int MinBufferSize = 128;

        private Stream stream; 
        private Encoding encoding;
        private Decoder decoder; 
        private byte[] byteBuffer; 
        private char[] charBuffer;
        // Record the number of valid bytes in the byteBuffer, for a few checks. 

        // This is the maximum number of chars we can get from one call to
        // ReadBuffer.  Used so ReadBuffer can tell when to copy data into
        // a user's char[] directly, instead of our internal char[]. 
        private int _maxCharsPerBuffer;
 
        // Store a backpointer to the process class, to check for user callbacks 
        private Process process;
 
        // Delegate to call user function.
        private UserCallBack userCallBack;

        // Internal Cancel operation 
        private bool cancelOperation;
        private ManualResetEvent eofEvent; 
        private Queue messageQueue; 
        private StringBuilder sb;
        private bool bLastCarriageReturn; 
 		
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding)
            : this(process, stream, callback, encoding, DefaultBufferSize) {
        } 

 
        // Creates a new AsyncStreamReader for the given stream.  The 
        // character encoding is set by encoding and the buffer size,
        // in number of 16-bit characters, is set by bufferSize. 
        //
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback,  Encoding encoding, int bufferSize)
        {
            Debug.Assert (process != null && stream !=null && encoding !=null && callback != null, "Invalid arguments!"); 
            Debug.Assert(stream.CanRead, "Stream must be readable!");
            Debug.Assert(bufferSize > 0, "Invalid buffer size!"); 
 
            Init(process, stream, callback, encoding, bufferSize);
            messageQueue = new Queue(); 
        }

        private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize) {
            this.process = process; 
            this.stream = stream;
            this.encoding = encoding; 
            this.userCallBack = callback; 
            decoder = encoding.GetDecoder();
            if (bufferSize < MinBufferSize) bufferSize = MinBufferSize; 
            byteBuffer = new byte[bufferSize];
            _maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
            charBuffer = new char[_maxCharsPerBuffer];
            cancelOperation = false; 
            eofEvent = new ManualResetEvent(false);
            sb = null; 
            this.bLastCarriageReturn = false; 
        }
 
        public virtual void Close()
        {
            Dispose(true);
        } 

        void IDisposable.Dispose() { 
            Dispose(true); 
            GC.SuppressFinalize(this);
        } 

        protected virtual void Dispose(bool disposing)
        {
            if (disposing) { 
                if (stream != null)
                    stream.Close(); 
            } 
            if (stream != null) {
                stream = null; 
                encoding = null;
                decoder = null;
                byteBuffer = null;
                charBuffer = null; 
            }
 
            if( eofEvent != null) { 
                eofEvent.Close();
                eofEvent = null; 
            }
        }

        public virtual Encoding CurrentEncoding { 
            get { return encoding; }
        } 
 
        public virtual Stream BaseStream {
            get { return stream; } 
        }

        // User calls BeginRead to start the asynchronous read
        internal void BeginReadLine() { 
            if( cancelOperation) {
                cancelOperation = false; 
            } 

            if( sb == null ) { 
                sb = new StringBuilder(DefaultBufferSize);
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
            else { 
                FlushMessageQueue();
            } 
        } 

        internal void CancelOperation() { 
            cancelOperation = true;
        }

        // This is the async callback function. Only one thread could/should call this. 
        private void ReadBuffer(IAsyncResult ar) {
 
            int byteLen; 

            try { 
                byteLen = stream.EndRead(ar);
            }
            catch (IOException ) {
                // We should ideally consume errors from operations getting cancelled 
                // so that we don't crash the unsuspecting parent with an unhandled exc.
                // This seems to come in 2 forms of exceptions (depending on platform and scenario), 
                // namely OperationCanceledException and IOException (for errorcode that we don't 
                // map explicitly).
                byteLen = 0; // Treat this as EOF 
            }
            catch (OperationCanceledException ) {
                // We should consume any OperationCanceledException from child read here
                // so that we don't crash the parent with an unhandled exc 
                byteLen = 0; // Treat this as EOF
            } 
 
            if (byteLen == 0) {
                // We're at EOF, we won't call this function again from here on. 
                lock(messageQueue) {
                    if( sb.Length != 0) {
                        messageQueue.Enqueue(sb.ToString());
                        sb.Length = 0; 
                    }
                    messageQueue.Enqueue(null); 
                } 

                try { 
                    // UserCallback could throw, we should still set the eofEvent
                    FlushMessageQueue();
                }
                finally { 
                    eofEvent.Set();
                } 
            } else { 
                int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
                sb.Append(charBuffer, 0, charLen); 
                GetLinesFromStringBuilder();
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
        } 

 
        // Read lines stored in StringBuilder and the buffer we just read into. 
        // A line is defined as a sequence of characters followed by
        // a carriage return ('\r'), a line feed ('\n'), or a carriage return 
        // immediately followed by a line feed. The resulting string does not
        // contain the terminating carriage return and/or line feed. The returned
        // value is null if the end of the input stream has been reached.
        // 

        private void GetLinesFromStringBuilder() { 
            int i = 0; 
            int lineStart = 0;
            int len = sb.Length; 

            // skip a beginning '\n' character of new block if last block ended
            // with '\r'
            if (bLastCarriageReturn && (len > 0) && sb[0] == '\n') 
            {
                i = 1; 
                lineStart = 1; 
                bLastCarriageReturn = false;
            } 
		
            while(i < len) {
                char ch = sb[i];
                // Note the following common line feed chars: 
                // \n - UNIX   \r\n - DOS   \r - Mac
                if (ch == '\r' || ch == '\n') { 
                    string s = sb.ToString(lineStart, i - lineStart); 
                    lineStart = i + 1;
                    // skip the "\n" character following "\r" character 
                    if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
                    {
                        lineStart++;
                        i++; 
                    }
 
                    lock(messageQueue) { 
                        messageQueue.Enqueue(s);
                    } 
                }
                i++;
            }
            if (sb[len - 1] == '\r') { 
                bLastCarriageReturn = true;
            } 
            // Keep the rest characaters which can't form a new line in string builder. 
            if( lineStart < len) {
                sb.Remove(0, lineStart); 
            }
            else {
                sb.Length = 0;
            } 

            FlushMessageQueue(); 
        } 

        private void FlushMessageQueue() { 
            while(true) {

                // When we call BeginReadLine, we also need to flush the queue
                // So there could be a ---- between the ReadBuffer and BeginReadLine 
                // We need to take lock before DeQueue.
                if( messageQueue.Count > 0) { 
                    lock(messageQueue) { 
                        if( messageQueue.Count > 0) {
                            string s = (string)messageQueue.Dequeue(); 
                            // skip if the read is the read is cancelled
                            // this might happen inside UserCallBack
                            // However, continue to drain the queue
                            if (!cancelOperation) 
                            {
                                userCallBack(s); 
                            }							 
                        }
                    } 
                }
                else {
                    break;
                } 
            }
        } 
 
        // Wait until we hit EOF. This is called from Process.WaitForExit
        // We will lose some information if we don't do this. 
        internal void WaitUtilEOF() {
            if( eofEvent != null) {
                eofEvent.WaitOne();
                eofEvent.Close(); 
                eofEvent = null;
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
/*============================================================
** 
** Class:  AsyncStreamReader 
**
** Purpose: For reading text from streams using a particular 
** encoding in an asychronous manner used by the process class
**
**
===========================================================*/ 

 
namespace System.Diagnostics { 
    using System;
    using System.IO; 
    using System.Text;
    using System.Runtime.InteropServices;
    using System.Threading;
    using System.Collections; 

    internal delegate void UserCallBack(String data); 
 
    internal class AsyncStreamReader : IDisposable
    { 
        internal const int DefaultBufferSize = 1024;  // Byte buffer size
        private const int MinBufferSize = 128;

        private Stream stream; 
        private Encoding encoding;
        private Decoder decoder; 
        private byte[] byteBuffer; 
        private char[] charBuffer;
        // Record the number of valid bytes in the byteBuffer, for a few checks. 

        // This is the maximum number of chars we can get from one call to
        // ReadBuffer.  Used so ReadBuffer can tell when to copy data into
        // a user's char[] directly, instead of our internal char[]. 
        private int _maxCharsPerBuffer;
 
        // Store a backpointer to the process class, to check for user callbacks 
        private Process process;
 
        // Delegate to call user function.
        private UserCallBack userCallBack;

        // Internal Cancel operation 
        private bool cancelOperation;
        private ManualResetEvent eofEvent; 
        private Queue messageQueue; 
        private StringBuilder sb;
        private bool bLastCarriageReturn; 
 		
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding)
            : this(process, stream, callback, encoding, DefaultBufferSize) {
        } 

 
        // Creates a new AsyncStreamReader for the given stream.  The 
        // character encoding is set by encoding and the buffer size,
        // in number of 16-bit characters, is set by bufferSize. 
        //
        internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback,  Encoding encoding, int bufferSize)
        {
            Debug.Assert (process != null && stream !=null && encoding !=null && callback != null, "Invalid arguments!"); 
            Debug.Assert(stream.CanRead, "Stream must be readable!");
            Debug.Assert(bufferSize > 0, "Invalid buffer size!"); 
 
            Init(process, stream, callback, encoding, bufferSize);
            messageQueue = new Queue(); 
        }

        private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize) {
            this.process = process; 
            this.stream = stream;
            this.encoding = encoding; 
            this.userCallBack = callback; 
            decoder = encoding.GetDecoder();
            if (bufferSize < MinBufferSize) bufferSize = MinBufferSize; 
            byteBuffer = new byte[bufferSize];
            _maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
            charBuffer = new char[_maxCharsPerBuffer];
            cancelOperation = false; 
            eofEvent = new ManualResetEvent(false);
            sb = null; 
            this.bLastCarriageReturn = false; 
        }
 
        public virtual void Close()
        {
            Dispose(true);
        } 

        void IDisposable.Dispose() { 
            Dispose(true); 
            GC.SuppressFinalize(this);
        } 

        protected virtual void Dispose(bool disposing)
        {
            if (disposing) { 
                if (stream != null)
                    stream.Close(); 
            } 
            if (stream != null) {
                stream = null; 
                encoding = null;
                decoder = null;
                byteBuffer = null;
                charBuffer = null; 
            }
 
            if( eofEvent != null) { 
                eofEvent.Close();
                eofEvent = null; 
            }
        }

        public virtual Encoding CurrentEncoding { 
            get { return encoding; }
        } 
 
        public virtual Stream BaseStream {
            get { return stream; } 
        }

        // User calls BeginRead to start the asynchronous read
        internal void BeginReadLine() { 
            if( cancelOperation) {
                cancelOperation = false; 
            } 

            if( sb == null ) { 
                sb = new StringBuilder(DefaultBufferSize);
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
            else { 
                FlushMessageQueue();
            } 
        } 

        internal void CancelOperation() { 
            cancelOperation = true;
        }

        // This is the async callback function. Only one thread could/should call this. 
        private void ReadBuffer(IAsyncResult ar) {
 
            int byteLen; 

            try { 
                byteLen = stream.EndRead(ar);
            }
            catch (IOException ) {
                // We should ideally consume errors from operations getting cancelled 
                // so that we don't crash the unsuspecting parent with an unhandled exc.
                // This seems to come in 2 forms of exceptions (depending on platform and scenario), 
                // namely OperationCanceledException and IOException (for errorcode that we don't 
                // map explicitly).
                byteLen = 0; // Treat this as EOF 
            }
            catch (OperationCanceledException ) {
                // We should consume any OperationCanceledException from child read here
                // so that we don't crash the parent with an unhandled exc 
                byteLen = 0; // Treat this as EOF
            } 
 
            if (byteLen == 0) {
                // We're at EOF, we won't call this function again from here on. 
                lock(messageQueue) {
                    if( sb.Length != 0) {
                        messageQueue.Enqueue(sb.ToString());
                        sb.Length = 0; 
                    }
                    messageQueue.Enqueue(null); 
                } 

                try { 
                    // UserCallback could throw, we should still set the eofEvent
                    FlushMessageQueue();
                }
                finally { 
                    eofEvent.Set();
                } 
            } else { 
                int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
                sb.Append(charBuffer, 0, charLen); 
                GetLinesFromStringBuilder();
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
        } 

 
        // Read lines stored in StringBuilder and the buffer we just read into. 
        // A line is defined as a sequence of characters followed by
        // a carriage return ('\r'), a line feed ('\n'), or a carriage return 
        // immediately followed by a line feed. The resulting string does not
        // contain the terminating carriage return and/or line feed. The returned
        // value is null if the end of the input stream has been reached.
        // 

        private void GetLinesFromStringBuilder() { 
            int i = 0; 
            int lineStart = 0;
            int len = sb.Length; 

            // skip a beginning '\n' character of new block if last block ended
            // with '\r'
            if (bLastCarriageReturn && (len > 0) && sb[0] == '\n') 
            {
                i = 1; 
                lineStart = 1; 
                bLastCarriageReturn = false;
            } 
		
            while(i < len) {
                char ch = sb[i];
                // Note the following common line feed chars: 
                // \n - UNIX   \r\n - DOS   \r - Mac
                if (ch == '\r' || ch == '\n') { 
                    string s = sb.ToString(lineStart, i - lineStart); 
                    lineStart = i + 1;
                    // skip the "\n" character following "\r" character 
                    if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
                    {
                        lineStart++;
                        i++; 
                    }
 
                    lock(messageQueue) { 
                        messageQueue.Enqueue(s);
                    } 
                }
                i++;
            }
            if (sb[len - 1] == '\r') { 
                bLastCarriageReturn = true;
            } 
            // Keep the rest characaters which can't form a new line in string builder. 
            if( lineStart < len) {
                sb.Remove(0, lineStart); 
            }
            else {
                sb.Length = 0;
            } 

            FlushMessageQueue(); 
        } 

        private void FlushMessageQueue() { 
            while(true) {

                // When we call BeginReadLine, we also need to flush the queue
                // So there could be a ---- between the ReadBuffer and BeginReadLine 
                // We need to take lock before DeQueue.
                if( messageQueue.Count > 0) { 
                    lock(messageQueue) { 
                        if( messageQueue.Count > 0) {
                            string s = (string)messageQueue.Dequeue(); 
                            // skip if the read is the read is cancelled
                            // this might happen inside UserCallBack
                            // However, continue to drain the queue
                            if (!cancelOperation) 
                            {
                                userCallBack(s); 
                            }							 
                        }
                    } 
                }
                else {
                    break;
                } 
            }
        } 
 
        // Wait until we hit EOF. This is called from Process.WaitForExit
        // We will lose some information if we don't do this. 
        internal void WaitUtilEOF() {
            if( eofEvent != null) {
                eofEvent.WaitOne();
                eofEvent.Close(); 
                eofEvent = null;
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK