BufferedOutputAsyncStream.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / BufferedOutputAsyncStream.cs / 1 / BufferedOutputAsyncStream.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel.Channels
{ 

    using System; 
    using System.Diagnostics; 
    using System.Globalization;
    using System.IO; 
    using System.ServiceModel;
    using System.Collections;
    using System.Collections.Generic;
    using System.Threading; 

    ///  
    /// 
    /// BufferedOutputAsyncStream is used for writing streamed response.
    /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk  without a delay. 
    /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without
    /// waiting for all outstanding BeginWrites to complete.
    ///
    /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream 
    ///     1.	allow concurrent IO (for multiple BeginWrite calls)
    ///     2.	support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. 
    /// 
    /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements.
    /// 
    /// 
    class BufferedOutputAsyncStream : Stream
    {
        Stream stream; 
        int bufferSize;
        int bufferLimit; 
        List buffers; 
        int currentIndex;
 
        internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit)
        {
            this.stream = stream;
            this.bufferSize = bufferSize; 
            this.bufferLimit = bufferLimit;
            this.buffers = new List(); 
            this.buffers.Add(new ByteBuffer(this.bufferSize, stream)); 
            this.currentIndex = 0;
        } 

        public override bool CanRead
        {
            get { return false; } 
        }
 
        public override bool CanSeek 
        {
            get { return false; } 
        }

        public override bool CanWrite
        { 
            get { return stream.CanWrite; }
        } 
 
        ByteBuffer CurrentBuffer
        { 
            get { return buffers[currentIndex]; }
        }

        public override long Length 
        {
            get 
            { 
#pragma warning suppress 56503 // [....], required by the Stream.Length contract
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); 
            }
        }

        public override long Position 
        {
            get 
            { 
#pragma warning suppress 56503 // [....], required by the Stream.Position contract
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); 
            }
            set
            {
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); 
            }
        } 
 
        public override void Close()
        { 
            CurrentBuffer.Flush();
            stream.Close();

            // Complete all outstanding writes 
            WaitForAllWritesToComplete();
        } 
 
        public override void Flush()
        { 
            CurrentBuffer.Flush();
            stream.Flush();
        }
 
        void NextBuffer()
        { 
            currentIndex++; 
            if (currentIndex == buffers.Count)
            { 
                if (buffers.Count < bufferLimit)
                {
                    // allocate new buffer
                    buffers.Add(new ByteBuffer(bufferSize, stream)); 
                    return;
                } 
                currentIndex = 0; 
            }
            DiagnosticUtility.DebugAssert(currentIndex >= 0 && currentIndex < buffers.Count, string.Format(CultureInfo.InvariantCulture, "The value ({0}) must be greater than or equal to zero and less than {1}", currentIndex, buffers.Count)); 

            // Make Sure that the buffer is ready to receive data
            CurrentBuffer.WaitForWriteComplete();
        } 

        public override int Read(byte[] buffer, int offset, int count) 
        { 
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported)));
        } 

        public override int ReadByte()
        {
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); 
        }
 
        public override long Seek(long offset, SeekOrigin origin) 
        {
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); 
        }

        public override void SetLength(long value)
        { 
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
        } 
 
        void WaitForAllWritesToComplete()
        { 
            // Complete all outstanding writes
            for (int i = 0; i < buffers.Count; i++)
            {
                buffers[i].WaitForWriteComplete(); 
            }
        } 
 
        public override void Write(byte[] buffer, int offset, int count)
        { 
            while (count > 0)
            {
                if (CurrentBuffer.IsWritePending)
                { 
                    NextBuffer();
                } 
                int freeBytes = CurrentBuffer.FreeBytes;   // space left in the CurrentBuffer 
                if (freeBytes > 0)
                { 
                    if (freeBytes > count)
                        freeBytes = count;

                    CurrentBuffer.CopyData(buffer, offset, freeBytes); 
                    offset += freeBytes;
                    count -= freeBytes; 
                } 
                if (CurrentBuffer.FreeBytes == 0)
                { 
                    CurrentBuffer.Flush();
                }
            }
        } 

        public override void WriteByte(byte value) 
        { 
            if (CurrentBuffer.IsWritePending)
            { 
                NextBuffer();
            }
            CurrentBuffer.CopyData(value);
            if (CurrentBuffer.FreeBytes == 0) 
            {
                CurrentBuffer.Flush(); 
            } 
        }
 
        class ByteBuffer
        {
            byte[] bytes;
            int position = 0; 
            Stream stream;
            static AsyncCallback writeCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(WriteCallback)); 
            bool writePending = false; 
            bool waiting = false;
            Exception completionException; 

            internal ByteBuffer(int bufferSize, Stream stream)
            {
                this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); 
                this.stream = stream;
            } 
 
            internal bool IsWritePending
            { 
                get { return writePending; }
            }

            object ThisLock 
            {
                get { return this; } 
            } 

            internal int FreeBytes { get { return this.bytes.Length - this.position; } } 

            internal void Flush()
            {
                if (this.position <= 0) 
                    return;
 
                int bytesToWrite = this.position; 
                this.writePending = true;
                this.position = 0; 
                IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, writeCallback, this);
                if (asyncResult.CompletedSynchronously)
                {
                    this.stream.EndWrite(asyncResult); 
                    this.writePending = false;
                } 
            } 

            static void WriteCallback(IAsyncResult result) 
            {
                if (result.CompletedSynchronously)
                    return;
 
                // Fetch our state information: ByteBuffer
                ByteBuffer buffer = (ByteBuffer)result.AsyncState; 
                try 
                {
                    buffer.stream.EndWrite(result); 
                }
#pragma warning suppress 56500 // [....], transferring exception to another thread
                catch (Exception e)
                { 
                    if (DiagnosticUtility.IsFatal(e))
                    { 
                        throw; 
                    }
                    buffer.completionException = e; 
                }

                // Tell the main thread we've finished.
                lock (buffer.ThisLock) 
                {
                    buffer.writePending = false; 
 
                    // Do not Pulse if no one is waiting, to avoid the overhead of Pulse
                    if (!buffer.waiting) 
                        return;

                    Monitor.Pulse(buffer.ThisLock);
                } 
            }
 
            internal void WaitForWriteComplete() 
            {
                lock (ThisLock) 
                {
                    if (this.writePending)
                    {
                        // Wait until the async write of this buffer is finished. 
                        this.waiting = true;
                        Monitor.Wait(ThisLock); 
                        this.waiting = false; 
                    }
                } 
                // Raise exception if necessary
                if (this.completionException != null)
                {
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); 
                }
            } 
 
            internal void CopyData(byte[] buffer, int offset, int count)
            { 
                DiagnosticUtility.DebugAssert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position));
                DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position));

                Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); 
                this.position += count;
            } 
 
            internal void CopyData(byte value)
            { 
                DiagnosticUtility.DebugAssert(this.position < this.bytes.Length, "Buffer is full");
                DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position));

                this.bytes[this.position++] = value; 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK