_CacheStreams.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Net / System / Net / Cache / _CacheStreams.cs / 1305376 / _CacheStreams.cs

                            /*++ 
Copyright (c) Microsoft Corporation

Module Name:
 
    _CacheStreams.cs
 
Abstract: 
    The file contains two streams used in conjunction with caching.
    The first class will combine two streams for reading into just one continued stream. 
    The second class will forward (as writes) to external stream all reads issued on a "this" stream.

Author:
 
    Alexei Vopilov    21-Dec-2002
 
Revision History: 

--*/ 
namespace System.Net.Cache {
 	using System;
	using System.Net;
	using System.IO; 
	using System.Threading;
 	using System.Collections.Specialized; 
    using System.Diagnostics; 

 
    internal abstract class BaseWrapperStream : Stream, IRequestLifetimeTracker
    {
        private Stream m_WrappedStream;
 
        protected Stream WrappedStream
        { 
            get { return m_WrappedStream; } 
        }
 
        public BaseWrapperStream(Stream wrappedStream)
        {
            Debug.Assert(wrappedStream != null);
            m_WrappedStream = wrappedStream; 
        }
 
        public void TrackRequestLifetime(long requestStartTimestamp) 
        {
            IRequestLifetimeTracker stream = m_WrappedStream as IRequestLifetimeTracker; 
            Debug.Assert(stream != null, "Wrapped stream must implement IRequestLifetimeTracker interface");
            stream.TrackRequestLifetime(requestStartTimestamp);
        }
    } 

    // 
    // This stream will take two Streams (head and tail) and combine them into a single stream 
    // Only read IO is supported!
    // 
    internal class CombinedReadStream : BaseWrapperStream, ICloseEx {
        private Stream  m_HeadStream;
        private bool    m_HeadEOF;
        private long    m_HeadLength; 
        private int     m_ReadNesting;
        private AsyncCallback m_ReadCallback;   //lazy initialized 
 

        internal CombinedReadStream(Stream headStream, Stream tailStream) 
            : base(tailStream)
        {
            m_HeadStream = headStream;
            m_HeadEOF = headStream == Stream.Null; 
        }
 
        public override bool CanRead { 
            get {return m_HeadEOF? WrappedStream.CanRead: m_HeadStream.CanRead;}
        } 

        // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
        public override bool CanSeek {
            get {return false;} 
        }
 
        public override bool CanWrite { 
            get {return false;}
        } 

        public override long Length {
            get {
                return WrappedStream.Length + (m_HeadEOF? m_HeadLength: m_HeadStream.Length); 
            }
        } 
 
        public override long Position {
            get { 
                return WrappedStream.Position + (m_HeadEOF? m_HeadLength: m_HeadStream.Position);
            }

            set { 
                throw new NotSupportedException(SR.GetString(SR.net_noseek));
            } 
        } 

        public override long Seek(long offset, SeekOrigin origin) { 
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        }

        public override void SetLength(long value) { 
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        } 
 
        public override void Write(byte[] buffer, int offset, int count) {
            throw new NotSupportedException(SR.GetString(SR.net_noseek)); 
        }

        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
            throw new NotSupportedException(SR.GetString(SR.net_noseek)); 
        }
 
        public override void EndWrite(IAsyncResult asyncResult) { 
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        } 

        public override void Flush() {
        }
 
        public override int Read(byte[] buffer, int offset, int count) {
 
            try { 
                if (Interlocked.Increment(ref m_ReadNesting) != 1) {
                    throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read")); 
                }

                if (m_HeadEOF) {
                    return WrappedStream.Read(buffer, offset, count); 
                }
                else { 
                    int result = m_HeadStream.Read(buffer, offset, count); 
                    m_HeadLength += result;
                    if (result == 0 && count != 0) { 
                        m_HeadEOF = true;
                        m_HeadStream.Close();
                        result = WrappedStream.Read(buffer, offset, count);
                    } 
                    return result;
                } 
            } 
            finally {
                Interlocked.Decrement(ref m_ReadNesting); 
            }

        }
 

        // 
        // This is a wrapper result used to substitue the AsyncResult returned from m_HeadStream IO 
        // Note that once seen a EOF on m_HeadStream we will stop using this wrapper.
        // 
        private class InnerAsyncResult: LazyAsyncResult {
            public byte[] Buffer;
            public int    Offset;
            public int    Count; 

            public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count) 
            :base (null, userState, userCallback) { 

                Buffer = buffer; 
                Offset = offset;
                Count  = count;
            }
 
        }
 
        private void ReadCallback(IAsyncResult transportResult) { 
            GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName);
            if (transportResult.CompletedSynchronously) 
            {
                return;
            }
 
            InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;
            try { 
                // Complete transport IO, in this callback that is always the head stream 
                int count;
                if (!m_HeadEOF) { 
                    count = m_HeadStream.EndRead(transportResult);
                    m_HeadLength += count;
                }
                else { 
                    count = WrappedStream.EndRead(transportResult);
                } 
 

                //check on EOF condition 
                if (!m_HeadEOF && count == 0 && userResult.Count != 0) {
                    //Got a first stream EOF
                    m_HeadEOF = true;
                    m_HeadStream.Close(); 
                    IAsyncResult ar = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
                    if (!ar.CompletedSynchronously) { 
                        return; 
                    }
                    count = WrappedStream.EndRead(ar); 
                }
                // just complete user IO
                userResult.Buffer = null;
                userResult.InvokeCallback(count); 
            }
            catch (Exception e) { 
                //ASYNC: try to ignore even serious exceptions (nothing to loose?) 
                if (userResult.InternalPeekCompleted)
                    throw; 

                userResult.InvokeCallback(e);
            }
        } 

        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 
            try { 
                if (Interlocked.Increment(ref m_ReadNesting) != 1) {
                    throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read")); 
                }

                if (m_ReadCallback == null) {
                    m_ReadCallback = new AsyncCallback(ReadCallback); 
                }
 
                if (m_HeadEOF) { 
                    return WrappedStream.BeginRead(buffer, offset, count, callback, state);
                } 
                else {
                    InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count);
                    IAsyncResult ar = m_HeadStream.BeginRead(buffer, offset, count, m_ReadCallback, userResult);
 
                    if (!ar.CompletedSynchronously)
                    { 
                        return userResult; 
                    }
 
                    int bytes = m_HeadStream.EndRead(ar);
                    m_HeadLength += bytes;

                    //check on EOF condition 
                    if (bytes == 0 && userResult.Count != 0) {
                        //Got a first stream EOF 
                        m_HeadEOF = true; 
                        m_HeadStream.Close();
                        return WrappedStream.BeginRead(buffer, offset, count, callback, state); 
                    }
                    else {
                        // just complete user IO
                        userResult.Buffer = null; 
                        userResult.InvokeCallback(count);
                        return userResult; 
                    } 

                } 
            }
            catch {
                Interlocked.Decrement(ref m_ReadNesting);
                throw; 
            }
        } 
 
        public override int EndRead(IAsyncResult asyncResult) {
 
            if (Interlocked.Decrement(ref m_ReadNesting) != 0) {
                Interlocked.Increment(ref m_ReadNesting);
                throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead"));
            } 

            if (asyncResult == null) { 
                throw new ArgumentNullException("asyncResult"); 
            }
 
            InnerAsyncResult myResult = asyncResult as InnerAsyncResult;

            if (myResult == null) {
                // We are just passing IO down, although m_HeadEOF should be always true here. 
                GlobalLog.Assert(m_HeadEOF, "CombinedReadStream::EndRead|m_HeadEOF is false and asyncResult is not of InnerAsyncResult type {0).", asyncResult.GetType().FullName);
                return m_HeadEOF? WrappedStream.EndRead(asyncResult): m_HeadStream.EndRead(asyncResult); 
            } 

            // this is our wrapped AsyncResult 
            myResult.InternalWaitForCompletion();

            // Exception?
            if (myResult.Result is Exception) { 
                throw (Exception)(myResult.Result);
            } 
 
            // Report the count read
            return (int)myResult.Result; 
        }

        // Subclasses should use Dispose(bool, CloseExState)
        protected override sealed void Dispose(bool disposing) { 
            Dispose(disposing, CloseExState.Normal);
        } 
 
        void ICloseEx.CloseEx(CloseExState closeState) {
            Dispose(true, closeState); 
        }

        protected virtual void Dispose(bool disposing, CloseExState closeState) {
 
            // All below calls should already be idempotent
 
            try { 
                if (disposing) {
                    try { 
                        if (!m_HeadEOF) {
                            ICloseEx icloseEx = m_HeadStream as ICloseEx;
                            if (icloseEx != null) {
                                icloseEx.CloseEx(closeState); 
                            }
                            else { 
                                m_HeadStream.Close(); 
                            }
                        } 
                    }
                    finally {
                        ICloseEx icloseEx = WrappedStream as ICloseEx;
                        if (icloseEx != null) { 
                            icloseEx.CloseEx(closeState);
                        } 
                        else { 
                            WrappedStream.Close();
                        } 
                    }
                }
            }
            finally { 
                base.Dispose(disposing);
            } 
        } 

        public override bool CanTimeout { 
            get {
                return WrappedStream.CanTimeout && m_HeadStream.CanTimeout;
            }
        } 

        public override int ReadTimeout { 
            get { 
                return (m_HeadEOF) ? WrappedStream.ReadTimeout : m_HeadStream.ReadTimeout;
            } 
            set {
                WrappedStream.ReadTimeout = m_HeadStream.ReadTimeout = value;
            }
        } 

        public override int WriteTimeout { 
            get { 
                return (m_HeadEOF) ? WrappedStream.WriteTimeout : m_HeadStream.WriteTimeout;
            } 
            set {
                WrappedStream.WriteTimeout = m_HeadStream.WriteTimeout = value;
            }
        } 
    }
 
    // 
    // This stream will plug into a stream and listen for all reads on it
    // It is also constructed with yet another stream used for multiplexing IO to 
    //
    // When it sees a read on this stream the result gets forwarded as write to a shadow stream.
    // ONLY READ IO is supported!
    // 
    internal class ForwardingReadStream : BaseWrapperStream, ICloseEx {
        private Stream  m_ShadowStream; 
        private int     m_ReadNesting; 
        private bool    m_ShadowStreamIsDead;
        private AsyncCallback m_ReadCallback;   // lazy initialized 
        private long    m_BytesToSkip;       // suppress from the read first number of bytes
        private bool    m_ThrowOnWriteError;
        private bool    m_SeenReadEOF;
 

        internal ForwardingReadStream(Stream originalStream, Stream shadowStream, long bytesToSkip, bool throwOnWriteError) 
            : base(originalStream) 
        {
            if (!shadowStream.CanWrite) { 
                throw new ArgumentException(SR.GetString(SR.net_cache_shadowstream_not_writable), "shadowStream");
            }
            m_ShadowStream = shadowStream;
            m_BytesToSkip = bytesToSkip; 
            m_ThrowOnWriteError = throwOnWriteError;
        } 
 
        public override bool CanRead {
            get {return WrappedStream.CanRead;} 
        }

        // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
        public override bool CanSeek { 
            get {return false;}
        } 
 
        public override bool CanWrite {
            get {return false;} 
        }

        public override long Length {
            get { 
                return WrappedStream.Length - m_BytesToSkip;
            } 
        } 

        public override long Position { 
            get {
                return WrappedStream.Position - m_BytesToSkip;
            }
 
            set {
                throw new NotSupportedException(SR.GetString(SR.net_noseek)); 
            } 
        }
 
        public override long Seek(long offset, SeekOrigin origin) {
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        }
 
        public override void SetLength(long value) {
            throw new NotSupportedException(SR.GetString(SR.net_noseek)); 
        } 

        public override void Write(byte[] buffer, int offset, int count) { 
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        }

        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 
            throw new NotSupportedException(SR.GetString(SR.net_noseek));
        } 
 
        public override void EndWrite(IAsyncResult asyncResult) {
            throw new NotSupportedException(SR.GetString(SR.net_noseek)); 
        }

        public override void Flush() {
        } 

        public override int Read(byte[] buffer, int offset, int count) { 
 
            bool isDoingWrite = false;
            int result = -1; 
            if (Interlocked.Increment(ref m_ReadNesting) != 1) {
                throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "Read", "read"));
            }
 
            try {
 
                if (m_BytesToSkip != 0L) { 
                    // Sometime we want to combine cached + live stream AND the user requested explicit range starts from not 0
                    byte[] tempBuffer = new byte[4096]; 
                    while (m_BytesToSkip != 0L) {
                        int bytes = WrappedStream.Read(tempBuffer, 0, (m_BytesToSkip < (long)tempBuffer.Length? (int)m_BytesToSkip: tempBuffer.Length));
                        if (bytes == 0)
                            m_SeenReadEOF = true; 

                        m_BytesToSkip -= bytes; 
                        if (!m_ShadowStreamIsDead) 
                            m_ShadowStream.Write(tempBuffer, 0, bytes);
                    } 
                }

                result = WrappedStream.Read(buffer, offset, count);
                if (result == 0) 
                    m_SeenReadEOF = true;
 
                if (m_ShadowStreamIsDead) { 
                    return result;
                } 
                isDoingWrite = true;
                m_ShadowStream.Write(buffer, offset, result);
                return result;
            } 
            catch (Exception e) {
                if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) 
                    throw; 

                GlobalLog.Print("ShadowReadStream::Read() Got Exception, disabling the shadow stream, stack trace = " + e.ToString()); 
                if (!m_ShadowStreamIsDead) {
                    // try to ignore even serious exception, since got nothing to loose?
                    m_ShadowStreamIsDead = true;
                    try { 
                        if (m_ShadowStream is ICloseEx)
                            ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 
                        else 
                            m_ShadowStream.Close();
                    } 
                    catch (Exception ee) {
                        if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
                            throw;
                        GlobalLog.Print("ShadowReadStream::Read() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString()); 
                    }
                } 
                if (!isDoingWrite || m_ThrowOnWriteError) 
                    throw;
 
                return result;
            }
            finally {
                Interlocked.Decrement(ref m_ReadNesting); 
            }
        } 
 

        // 
        // This is a wrapper result used to substitue the AsyncResult returned from WrappedStream IO
        // Note that once seen a m_ShadowStream error we will stop using this wrapper.
        //
        private class InnerAsyncResult: LazyAsyncResult { 
            public byte[] Buffer;
            public int    Offset; 
            public int    Count; 
            public bool   IsWriteCompletion;
 
            public InnerAsyncResult(object userState, AsyncCallback userCallback, byte[] buffer, int offset, int count)
            :base (null, userState, userCallback) {

                Buffer = buffer; 
                Offset = offset;
                Count  = count; 
            } 

        } 

        private void ReadCallback(IAsyncResult transportResult) {
            GlobalLog.Assert(transportResult.AsyncState is InnerAsyncResult, "InnerAsyncResult::ReadCallback|The state expected to be of type InnerAsyncResult, received {0}.", transportResult.GetType().FullName);
            if (transportResult.CompletedSynchronously) 
            {
                return; 
            } 

            // Recover our asyncResult 
            InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;

            ReadComplete(transportResult);
        } 

        private void ReadComplete(IAsyncResult transportResult) 
        { 
            while(true)
            { 
                // Recover our asyncResult
                InnerAsyncResult userResult = transportResult.AsyncState as InnerAsyncResult;

                try 
                {
                    if (!userResult.IsWriteCompletion) 
                    { 
                        userResult.Count = WrappedStream.EndRead(transportResult);
                        if (userResult.Count == 0) 
                            m_SeenReadEOF = true;


                        if (!m_ShadowStreamIsDead) { 
                            userResult.IsWriteCompletion = true;
                            //Optionally charge notification write IO 
                            transportResult = m_ShadowStream.BeginWrite(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult); 
                            if (transportResult.CompletedSynchronously)
                            { 
                                continue;
                            }
                            return;
                        } 
                    }
                    else 
                    { 
                        GlobalLog.Assert(!m_ShadowStreamIsDead, "ForwardingReadStream::ReadComplete|ERROR: IsWriteCompletion && m_ShadowStreamIsDead");
 
                        m_ShadowStream.EndWrite(transportResult);
                        userResult.IsWriteCompletion = false;
                    }
                } 
                catch (Exception e)
                { 
                    //ASYNC: try to ignore even serious exceptions (nothing to loose?) 
                    if (userResult.InternalPeekCompleted)
                    { 
                        GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (end), userResult.IsCompleted, stack trace = " + e.ToString());
                        throw;
                    }
 
                    try
                    { 
                        m_ShadowStreamIsDead = true; 
                        if (m_ShadowStream is ICloseEx)
                            ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 
                        else
                            m_ShadowStream.Close();
                    }
                    catch (Exception ee) 
                    {
                        //ASYNC: Again try to ignore even serious exceptions 
                        GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close, stack trace = " + ee.ToString()); 
                    }
 
                    if (!userResult.IsWriteCompletion || m_ThrowOnWriteError)
                    {
                        if (transportResult.CompletedSynchronously)
                        { 
                            throw;
                        } 
 
                        userResult.InvokeCallback(e);
                        return; 
                    }
                }

                // Need to process, re-issue the read. 
                try
                { 
                    if (m_BytesToSkip != 0L) { 
                        m_BytesToSkip -= userResult.Count;
                        userResult.Count = m_BytesToSkip < (long)userResult.Buffer.Length? (int)m_BytesToSkip: userResult.Buffer.Length; 
                        if (m_BytesToSkip == 0L) {
                            // we did hide the original IO request in the outer iaresult state.
                            // charge the real user operation now
                            transportResult = userResult; 
                            userResult = userResult.AsyncState as InnerAsyncResult;
                            GlobalLog.Assert(userResult != null, "ForwardingReadStream::ReadComplete|ERROR: Inner IAResult is null after stream FastForwarding."); 
                        } 
                        transportResult = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
                        if (transportResult.CompletedSynchronously) 
                        {
                            continue;
                        }
                        return; 
                    }
                    //if came to here, complete original user IO 
                    userResult.InvokeCallback(userResult.Count); 
                    return;
                } 
                catch (Exception e)
                {
                    //ASYNC: try to ignore even serious exceptions (nothing to loose?)
                    if (userResult.InternalPeekCompleted) 
                    {
                        GlobalLog.Print("ShadowReadStream::ReadComplete() Rethrowing Exception (begin), userResult.IsCompleted, stack trace = " + e.ToString()); 
                        throw; 
                    }
 
                    try
                    {
                        m_ShadowStreamIsDead = true;
                        if (m_ShadowStream is ICloseEx) 
                            ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent);
                        else 
                            m_ShadowStream.Close(); 
                    }
                    catch (Exception ee) 
                    {
                        //ASYNC: Again try to ignore even serious exceptions
                        GlobalLog.Print("ShadowReadStream::ReadComplete() Got (ignoring) Exception, on shadow stream.Close (after begin), stack trace = " + ee.ToString());
                    } 

                    if (transportResult.CompletedSynchronously) 
                    { 
                        throw;
                    } 

                    // This will set the exception result first then try to execute a user callback
                    userResult.InvokeCallback(e);
                    return; 
                }
            } 
        } 

        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 
            if (Interlocked.Increment(ref m_ReadNesting) != 1) {
                throw new NotSupportedException(SR.GetString(SR.net_io_invalidnestedcall, "BeginRead", "read"));
            }
 
            try {
 
                if (m_ReadCallback == null) { 
                    m_ReadCallback = new AsyncCallback(ReadCallback);
                } 

                if (m_ShadowStreamIsDead && m_BytesToSkip == 0L) {
                    return WrappedStream.BeginRead(buffer, offset, count, callback, state);
                } 
                else {
                    InnerAsyncResult userResult = new InnerAsyncResult(state, callback, buffer, offset, count); 
                    if (m_BytesToSkip != 0L) { 
                        InnerAsyncResult temp = userResult;
                        userResult = new InnerAsyncResult(temp, null, new byte[4096], 
                                                          0, m_BytesToSkip < (long) buffer.Length? (int)m_BytesToSkip: buffer.Length);
                    }
                    IAsyncResult result = WrappedStream.BeginRead(userResult.Buffer, userResult.Offset, userResult.Count, m_ReadCallback, userResult);
                    if (result.CompletedSynchronously) 
                    {
                        ReadComplete(result); 
                    } 
                    return userResult;
                } 
            }
            catch {
                Interlocked.Decrement(ref m_ReadNesting);
                throw; 
            }
        } 
 
        public override int EndRead(IAsyncResult asyncResult) {
 
            if (Interlocked.Decrement(ref m_ReadNesting) != 0) {
                Interlocked.Increment(ref m_ReadNesting);
                throw new InvalidOperationException(SR.GetString(SR.net_io_invalidendcall, "EndRead"));
            } 

            if (asyncResult == null) { 
                throw new ArgumentNullException("asyncResult"); 
            }
 
            InnerAsyncResult myResult = asyncResult as InnerAsyncResult;

            if (myResult == null) {
                // We are just passing IO down, although the shadow stream should be dead for now. 
                GlobalLog.Assert(m_ShadowStreamIsDead, "ForwardingReadStream::EndRead|m_ShadowStreamIsDead is false and asyncResult is not of InnerAsyncResult type {0}.", asyncResult.GetType().FullName);
                int bytes = WrappedStream.EndRead(asyncResult); 
                if (bytes == 0) 
                    m_SeenReadEOF = true;
            } 

            // this is our wrapped AsyncResult
            bool suceess = false;
            try { 
                myResult.InternalWaitForCompletion();
                // Exception? 
                if (myResult.Result is Exception) 
                    throw (Exception)(myResult.Result);
                suceess = true; 
            }
            finally {
                if (!suceess && !m_ShadowStreamIsDead) {
                    m_ShadowStreamIsDead = true; 
                    if (m_ShadowStream is ICloseEx)
                        ((ICloseEx)m_ShadowStream).CloseEx(CloseExState.Abort | CloseExState.Silent); 
                    else 
                        m_ShadowStream.Close();
                } 
            }

            // Report the read count
            return (int)myResult.Result; 
        }
 
        // Subclasses should use Dispose(bool, CloseExState) 
        protected sealed override void Dispose(bool disposing) {
            Dispose(disposing, CloseExState.Normal); 
        }

        private int _Disposed;
        void ICloseEx.CloseEx(CloseExState closeState) { 

            if (Interlocked.Increment(ref _Disposed) == 1) { 
                // This would allow us to cache the response stream that user throws away 
                // Next time the cached version could save us from an extra roundtrip
                if (closeState == CloseExState.Silent) { 
                    try {
                        int total = 0;
                        int bytesRead;
                        while (total < ConnectStream.s_DrainingBuffer.Length && (bytesRead = Read(ConnectStream.s_DrainingBuffer, 0, ConnectStream.s_DrainingBuffer.Length)) > 0) { 
                            total += bytesRead;
                        } 
                    } 
                    catch (Exception exception) {
                        //ATTN: this path will swalow errors regardless of m_IsThrowOnWriteError setting 
                        //      A "Silent" close is for an intermediate response that is to be ignored anyway
                        if (exception is ThreadAbortException || exception is StackOverflowException || exception is OutOfMemoryException) {
                            throw;
                        } 
                    }
                } 
 
                Dispose(true, closeState);
            } 
        }

        protected virtual void Dispose(bool disposing, CloseExState closeState) {
 
            // All below calls should already be idempotent
 
            try { 
                if (disposing) {
                    try { 
                        ICloseEx icloseEx = WrappedStream as ICloseEx;
                        if (icloseEx != null) {
                            icloseEx.CloseEx(closeState);
                        } 
                        else {
                            WrappedStream.Close(); 
                        } 
                    }
                    finally { 

                        // Notify the wirte stream on a partial response if did not see EOF on read
                        if (!m_SeenReadEOF)
                            closeState |= CloseExState.Abort; 

                        // 
                        // We don't want to touch m_ShadowStreamIsDead because Close() can be called from other thread while IO is in progress. 
                        // We assume that all streams used by this class are thread safe on Close().
                        // m_ShadowStreamIsDead = true; 

                        if (m_ShadowStream is ICloseEx)
                            ((ICloseEx)m_ShadowStream).CloseEx(closeState);
                        else 
                            m_ShadowStream.Close();
                    } 
                } 
            }
            finally { 
                base.Dispose(disposing);
            }
        }
 
        public override bool CanTimeout {
            get { 
                return WrappedStream.CanTimeout && m_ShadowStream.CanTimeout; 
            }
        } 

        public override int ReadTimeout {
            get {
                return WrappedStream.ReadTimeout; 
            }
            set { 
                WrappedStream.ReadTimeout = m_ShadowStream.ReadTimeout = value; 
            }
        } 

        public override int WriteTimeout {
            get {
                return m_ShadowStream.WriteTimeout; 
            }
            set { 
                WrappedStream.WriteTimeout = m_ShadowStream.WriteTimeout = value; 
            }
        } 
    }

    //
    // This stream will listen on the parent stream Close. 
    // Assuming the parent stream represents a READ stream such as CombinedReadStream or a response stream.
    // When the paretn stream is closed this wrapper will update the metadata associated with the entry. 
    internal class MetadataUpdateStream : BaseWrapperStream, ICloseEx { 
        private RequestCache m_Cache;
        private string      m_Key; 
        private DateTime    m_Expires;
        private DateTime    m_LastModified;
        private DateTime    m_LastSynchronized;
        private TimeSpan    m_MaxStale; 
        private StringCollection m_EntryMetadata;
        private StringCollection m_SystemMetadata; 
        private bool        m_CacheDestroy; 
        private bool        m_IsStrictCacheErrors;
 

        internal MetadataUpdateStream(  Stream parentStream,
                                        RequestCache cache,
                                        string      key, 
                                        DateTime    expiresGMT,
                                        DateTime    lastModifiedGMT, 
                                        DateTime    lastSynchronizedGMT, 
                                        TimeSpan    maxStale,
                                        StringCollection entryMetadata, 
                                        StringCollection systemMetadata,
                                        bool        isStrictCacheErrors)
            : base(parentStream)
        { 
            m_Cache             = cache;
            m_Key               = key; 
            m_Expires           = expiresGMT; 
            m_LastModified      = lastModifiedGMT;
            m_LastSynchronized  = lastSynchronizedGMT; 
            m_MaxStale          = maxStale;
            m_EntryMetadata     = entryMetadata;
            m_SystemMetadata    = systemMetadata;
            m_IsStrictCacheErrors = isStrictCacheErrors; 
        }
 
        // 
        // This constructor will result in removing a cache entry upon closure
        // 
        private MetadataUpdateStream(Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)
            : base(parentStream)
        {
            m_Cache             = cache; 
            m_Key               = key;
            m_CacheDestroy      = true; 
            m_IsStrictCacheErrors = isStrictCacheErrors; 
        }
        // 
        //
        //
        /*
        // Consider removing. 
        public static Stream CreateEntryRemovalStream(  Stream parentStream, RequestCache cache, string key, bool isStrictCacheErrors)
        { 
            return new MetadataUpdateStream(parentStream, cache, key, isStrictCacheErrors); 
        }
        */ 
        //
        public override bool CanRead {
            get {return WrappedStream.CanRead;}
        } 
        //
        // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 
        public override bool CanSeek { 
            get {return WrappedStream.CanSeek;}
        } 
        //
        public override bool CanWrite {
            get {return WrappedStream.CanWrite;}
        } 
        //
        public override long Length { 
            get {return WrappedStream.Length;} 
        }
        // 
        public override long Position {
            get {return WrappedStream.Position;}

            set {WrappedStream.Position = value;} 
        }
 
        public override long Seek(long offset, SeekOrigin origin) { 
            return WrappedStream.Seek(offset, origin);
        } 

        public override void SetLength(long value) {
            WrappedStream.SetLength(value);
        } 

        public override void Write(byte[] buffer, int offset, int count) { 
            WrappedStream.Write(buffer, offset, count); 
        }
 
        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
            return WrappedStream.BeginWrite(buffer, offset, count, callback, state);
        }
 
        public override void EndWrite(IAsyncResult asyncResult) {
            WrappedStream.EndWrite(asyncResult); 
        } 

        public override void Flush() { 
            WrappedStream.Flush();
        }

        public override int Read(byte[] buffer, int offset, int count) { 
            return WrappedStream.Read(buffer, offset,  count);
        } 
 
        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
            return WrappedStream.BeginRead(buffer, offset, count, callback, state); 
        }

        public override int EndRead(IAsyncResult asyncResult) {
            return WrappedStream.EndRead(asyncResult); 
        }
 
        // Subclasses should use Dispose(bool, CloseExState) 
        protected sealed override void Dispose(bool disposing) {
            Dispose(disposing, CloseExState.Normal); 
        }

        void ICloseEx.CloseEx(CloseExState closeState) {
            Dispose(true, closeState); 
        }
 
        public override bool CanTimeout { 
            get {
                return WrappedStream.CanTimeout; 
            }
        }

        public override int ReadTimeout { 
            get {
                return WrappedStream.ReadTimeout; 
            } 
            set {
                WrappedStream.ReadTimeout = value; 
            }
        }

        public override int WriteTimeout { 
            get {
                return WrappedStream.WriteTimeout; 
            } 
            set {
                WrappedStream.WriteTimeout = value; 
            }
        }

        private int _Disposed; 
        protected virtual void Dispose(bool disposing, CloseExState closeState) {
 
            try { 
                if (Interlocked.Increment(ref _Disposed) == 1) {
                    if (disposing) { 
                        ICloseEx icloseEx = WrappedStream as ICloseEx;

                        if (icloseEx != null) {
                            icloseEx.CloseEx(closeState); 
                        }
                        else { 
                            WrappedStream.Close(); 
                        }
 
                        if (m_CacheDestroy)
                        {
                            if (m_IsStrictCacheErrors)
                            { 
                                m_Cache.Remove(m_Key);
                            } 
                            else 
                            {
                                m_Cache.TryRemove(m_Key); 
                            }
                        }
                        else
                        { 
                            if (m_IsStrictCacheErrors)
                            { 
                                m_Cache.Update(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata); 
                            }
                            else 
                            {
                                m_Cache.TryUpdate(m_Key, m_Expires, m_LastModified, m_LastSynchronized, m_MaxStale, m_EntryMetadata, m_SystemMetadata);
                            }
 
                        }
                    } 
                } 
            }
            finally { 
                base.Dispose(disposing);
            }
        }
    } 

    // 
    // This stream is for Partial responses. 
    // It will scroll to the given position and limit the original stream windows to given size
    internal class RangeStream : BaseWrapperStream, ICloseEx { 
        long    m_Offset;
        long    m_Size;
        long    m_Position;
 
        internal RangeStream (Stream parentStream, long offset, long size)
            : base(parentStream) 
        { 
            m_Offset            = offset;
            m_Size              = size; 
            if (WrappedStream.CanSeek) {
                WrappedStream.Position = offset;
                m_Position = offset;
            } 
            else {
                // for now we expect a FileStream that is seekable. 
                throw new NotSupportedException(SR.GetString(SR.net_cache_non_seekable_stream_not_supported)); 
            }
        } 

        public override bool CanRead {
            get {return WrappedStream.CanRead;}
        } 

        // If CanSeek is false, Position, Seek, Length, and SetLength should throw. 
        public override bool CanSeek { 
            get {return WrappedStream.CanSeek;}
        } 

        public override bool CanWrite {
            get {return WrappedStream.CanWrite;}
        } 

        public override long Length { 
            get { 
                long dummy = WrappedStream.Length;
                return m_Size; 
            }
        }

        public override long Position { 
            get {return WrappedStream.Position-m_Offset;}
 
            set { 
                value += m_Offset;
                if (value > m_Offset + m_Size) { 
                    value = m_Offset + m_Size;
                }
                WrappedStream.Position = value;
            } 
        }
 
        public override long Seek(long offset, SeekOrigin origin) { 
            switch (origin) {
            case SeekOrigin.Begin: 
                        offset += m_Offset;
                        if (offset > m_Offset+m_Size) {
                            offset = m_Offset+m_Size;
                        } 
                        if (offset < m_Offset) {
                            offset = m_Offset; 
                        } 
                        break;
            case SeekOrigin.End: 
                        offset -= (m_Offset+m_Size);
                        if (offset > 0) {
                            offset = 0;
                        } 
                        if (offset < -m_Size) {
                            offset = -m_Size; 
                        } 
                        break;
            default: 
                        if (m_Position+offset > m_Offset+m_Size) {
                            offset = (m_Offset+m_Size) - m_Position;
                        }
                        if (m_Position+offset < m_Offset) { 
                            offset = m_Offset-m_Position;
                        } 
                        break; 
            }
            m_Position=WrappedStream.Seek(offset, origin); 
            return m_Position-m_Offset;
        }

        public override void SetLength(long value) { 
            throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream));
        } 
 
        public override void Write(byte[] buffer, int offset, int count) {
            if (m_Position + count > m_Offset+m_Size) { 
                throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream));
            }
            WrappedStream.Write(buffer, offset, count);
            m_Position += count; 
        }
 
        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { 
            if (m_Position+offset > m_Offset+m_Size) {
                throw new NotSupportedException(SR.GetString(SR.net_cache_unsupported_partial_stream)); 
            }
            return WrappedStream.BeginWrite(buffer, offset, count, callback, state);
        }
 
        public override void EndWrite(IAsyncResult asyncResult) {
            WrappedStream.EndWrite(asyncResult); 
            m_Position = WrappedStream.Position; 
        }
 
        public override void Flush() {
            WrappedStream.Flush();
        }
 
        public override int Read(byte[] buffer, int offset, int count) {
            if (m_Position >= m_Offset+m_Size) { 
                return 0; 
            }
            if (m_Position + count > m_Offset+m_Size) { 
                count = (int)(m_Offset + m_Size - m_Position);
            }
            int result = WrappedStream.Read(buffer, offset,  count);
            m_Position += result; 
            return result;
        } 
 
        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) {
            if (m_Position >= m_Offset+m_Size) { 
                count = 0;
            }
            else if (m_Position + count > m_Offset+m_Size) {
                count = (int)(m_Offset + m_Size - m_Position); 
            }
            return WrappedStream.BeginRead(buffer, offset, count, callback, state); 
        } 

        public override int EndRead(IAsyncResult asyncResult) { 
            int result = WrappedStream.EndRead(asyncResult);
            m_Position += result;
            return result;
        } 

        // Subclasses should use Dispose(bool, CloseExState) 
        protected sealed override void Dispose(bool disposing) { 
            Dispose(disposing, CloseExState.Normal);
        } 

        void ICloseEx.CloseEx(CloseExState closeState) {
            Dispose(true, closeState);
        } 

        public override bool CanTimeout { 
            get { 
                return WrappedStream.CanTimeout;
            } 
        }

        public override int ReadTimeout {
            get { 
                return WrappedStream.ReadTimeout;
            } 
            set { 
                WrappedStream.ReadTimeout = value;
            } 
        }

        public override int WriteTimeout {
            get { 
                return WrappedStream.WriteTimeout;
            } 
            set { 
                WrappedStream.WriteTimeout = value;
            } 
        }

        protected virtual void Dispose(bool disposing, CloseExState closeState) {
 
            // All calls below should already be idempotent.
 
            try 
            {
                if (disposing) { 

                    ICloseEx icloseEx = WrappedStream as ICloseEx;

                    if (icloseEx != null) { 
                        icloseEx.CloseEx(closeState);
                    } 
                    else { 
                        WrappedStream.Close();
                    } 
                }
            }
            finally
            { 
                base.Dispose(disposing);
            } 
 
        }
    } 

}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK