_StreamFramer.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Net / System / Net / _StreamFramer.cs / 1305376 / _StreamFramer.cs

                            /*++ 
Copyright (c) 2000 Microsoft Corporation

Module Name:
 
    _StreamFramer.cs
 
Abstract: 

 
Author:

    Mauro Ottaviani   original implementation
    Alexei Vopilov    20-Jul-2002 made it generic enough 
                      (still not perfect, consider IStreamFramer interface)
 
Revision History: 

--*/ 

namespace System.Net {
    using System;
    using System.IO; 
    using System.Runtime.InteropServices;
    using System.Threading; 
    using System.ComponentModel; 
    using System.Globalization;
    using System.Net; 
    using System.Net.Sockets;


    internal class StreamFramer { 
        private Stream m_Transport;
        private bool m_Eof; 
 

        private FrameHeader m_WriteHeader    = new FrameHeader(); 
        private FrameHeader m_CurReadHeader  = new FrameHeader();
        private FrameHeader m_ReadVerifier   = new FrameHeader(FrameHeader.IgnoreValue,
                                                               FrameHeader.IgnoreValue,
                                                               FrameHeader.IgnoreValue); 

        //private const int   c_DefaultBufferSize = 1024; 
        //private int         m_BufferSize  = c_DefaultBufferSize; 
        //private byte[]      m_ReadBuffer  = new byte[FrameHeader.SizeOf + m_BufferSize];
        //private int         m_CurReadOffset; 

        private byte[]    m_ReadHeaderBuffer;
        private byte[]    m_WriteHeaderBuffer;
 
        private readonly AsyncCallback m_ReadFrameCallback;
        private readonly AsyncCallback m_BeginWriteCallback; 
 

        private NetworkStream m_NetworkStream;  //optimizing writes 

        public StreamFramer(Stream Transport) {
            if (Transport == null || Transport == Stream.Null) {
                throw new ArgumentNullException("Transport"); 
            }
            m_Transport = Transport; 
            if(m_Transport.GetType() == typeof(NetworkStream)){ 
                m_NetworkStream = Transport as NetworkStream;
            } 
            m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size];
            m_WriteHeaderBuffer = new byte[m_WriteHeader.Size];

            m_ReadFrameCallback = new AsyncCallback(ReadFrameCallback); 
            m_BeginWriteCallback = new AsyncCallback(BeginWriteCallback);
 
        } 

        /* 
        // Consider removing.
        public FrameHeader m_ReadVerifierHeader {
            get {
                return m_ReadVerifier; 
            }
            // May not be called while IO is in progress 
            set { 
                m_ReadVerifier = value;
                m_CurReadHeader = m_ReadVerifier.Clone(); 
                m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size];
            }
        }
        */ 

        public FrameHeader ReadHeader { 
            get { 
                return m_CurReadHeader;
            } 
        }

        public FrameHeader WriteHeader {
            get { 
                return m_WriteHeader;
            } 
            /* 
            // Consider removing.
            // May not be called while IO is in progress 
            set {
                m_WriteHeader = value;
                m_WriteHeaderBuffer = new byte[m_WriteHeader.Size];
            } 
            */
        } 
 
        public Stream Transport {
            get { 
                return m_Transport;
            }
        }
 
        /*
        // Consider removing. 
        public bool EndOfFile { 
            get {
                return m_Eof; 
            }
        }
        */
 
        /*
        // Consider removing. 
        public bool CanRead { 
            get {
                return Transport.CanRead; 
            }
        }
        */
 
        /*
        // Consider removing. 
        public bool CanWrite { 
            get {
                return Transport.CanWrite; 
            }
        }
        */
 
        public byte[] ReadMessage() {
            if (m_Eof) { 
                return null; 
            }
 
            int offset = 0;
            byte[] buffer = m_ReadHeaderBuffer;

            int bytesRead; 
            while (offset < buffer.Length) {
                bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); 
                if (bytesRead == 0) { 
                    if (offset == 0) {
                        // m_Eof, return null 
                        m_Eof = true;
                        return null;
                    }
                    else { 
                        throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed)));
                    } 
                } 
                offset += bytesRead;
            } 

            m_CurReadHeader.CopyFrom(buffer, 0, m_ReadVerifier);
            if (m_CurReadHeader.PayloadSize > m_CurReadHeader.MaxMessageSize)
            { 
                throw new InvalidOperationException(SR.GetString(SR.net_frame_size,
                                                               m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), 
                                                               m_CurReadHeader.PayloadSize.ToString(NumberFormatInfo.InvariantInfo))); 
            }
 
            buffer = new byte[m_CurReadHeader.PayloadSize];

            offset = 0;
            while (offset < buffer.Length) { 
                bytesRead = Transport.Read(buffer, offset, buffer.Length - offset);
                if (bytesRead == 0) { 
                    throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); 
                }
                offset += bytesRead; 
            }
            return buffer;
        }
 
        public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateObject) {
            WorkerAsyncResult workerResult; 
 
            if (m_Eof){
                workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, null, 0, 0); 
                workerResult.InvokeCallback(-1);
                return workerResult;
            }
            workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, 
                                                                   m_ReadHeaderBuffer, 0,
                                                                   m_ReadHeaderBuffer.Length); 
 
            IAsyncResult result = Transport.BeginRead(m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length,
                                      m_ReadFrameCallback, workerResult); 
            if (result.CompletedSynchronously)
            {
                ReadFrameComplete(result);
            } 

            return workerResult; 
        } 

        private void ReadFrameCallback(IAsyncResult transportResult) 
        {
            GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName);
            if (transportResult.CompletedSynchronously)
            { 
                return;
            } 
 
            WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState;
 
            try
            {
                ReadFrameComplete(transportResult);
            } 
            catch (Exception e) {
                if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { 
                    throw; 
                }
 
                if (!(e is IOException)) {
                    e = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, e.Message), e);
                }
 
                // Let's call user callback and he call us back and we will throw
                workerResult.InvokeCallback(e); 
            } 
        }
 
        // IO COMPLETION CALLBACK
        //
        // This callback is responsible for getting complete protocol frame
        // First, it reads the header 
        // Second, it determines the frame size
        // Third, loops while not all frame received or an error. 
        // 
        private void ReadFrameComplete(IAsyncResult transportResult)
        { 
            do
            {
                GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameComplete|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName);
                WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; 

                int bytesRead = Transport.EndRead(transportResult); 
                workerResult.Offset += bytesRead; 

                GlobalLog.Assert(workerResult.Offset <= workerResult.End, "StreamFramer::ReadFrameCallback|WRONG: offset - end = {0}", workerResult.Offset - workerResult.End); 

                if (bytesRead <= 0) {
                    // (by design) This indicates the stream has receives EOF
                    // If we are in the middle of a Frame - fail, otherwise - produce EOF 
                    object result = null;
                    if (!workerResult.HeaderDone && workerResult.Offset == 0) { 
                        result = (object)-1; 
                    }
                    else { 
                        result = new System.IO.IOException(SR.GetString(SR.net_frame_read_io));

                    }
                    workerResult.InvokeCallback(result); 
                    return;
                } 
 
                if (workerResult.Offset >= workerResult.End) {
                    if (!workerResult.HeaderDone) { 
                        workerResult.HeaderDone = true;
                        // This indicates the header has been read succesfully
                        m_CurReadHeader.CopyFrom(workerResult.Buffer, 0, m_ReadVerifier);
                        int payloadSize = m_CurReadHeader.PayloadSize; 
                        if (payloadSize < 0) {
                            // Let's call user callback and he call us back and we will throw 
                            workerResult.InvokeCallback(new System.IO.IOException(SR.GetString(SR.net_frame_read_size))); 
                        }
                        if (payloadSize == 0) { 
                            // report emtpy frame (NOT eof!) to the caller, he might be interested in
                            workerResult.InvokeCallback(0);
                            return;
                        } 
                        if (payloadSize > m_CurReadHeader.MaxMessageSize)
                        { 
                            throw new InvalidOperationException(SR.GetString(SR.net_frame_size, 
                                                                            m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo),
                                                                            payloadSize.ToString(NumberFormatInfo.InvariantInfo))); 
                        }
                        // Start reading the remaining frame data (note header does not count)
                        byte[] frame = new byte[payloadSize];
                        // Save the ref of the data block 
                        workerResult.Buffer = frame;
                        workerResult.End = frame.Length; 
                        workerResult.Offset = 0; 
                        // Transport.BeginRead below will pickup those changes
                    } 
                    else {
                        workerResult.HeaderDone = false; //reset for optional object reuse
                        workerResult.InvokeCallback(workerResult.End);
                        return; 
                    }
                } 
                // This means we need more data to complete the data block 
                transportResult = Transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset,
                                            m_ReadFrameCallback, workerResult); 
            } while(transportResult.CompletedSynchronously);
        }

        // 
        // User will call this when workerResult gets signalled
        // 
        // On Beginread User always gets back our WorkerAsyncResult 
        // The Result property represents either a number of bytes read or an
        // exception put by our async state machine 
        //
        public byte[] EndReadMessage(IAsyncResult asyncResult) {
            if (asyncResult==null) {
                throw new ArgumentNullException("asyncResult"); 
            }
            WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; 
 
            if (workerResult == null) {
                throw new ArgumentException(SR.GetString(SR.net_io_async_result, typeof(WorkerAsyncResult).FullName), "asyncResult"); 
            }
            if (!workerResult.InternalPeekCompleted) {
                workerResult.InternalWaitForCompletion();
            } 

            if (workerResult.Result is Exception) { 
                throw (Exception)(workerResult.Result); 
            }
 
            int size = (int)workerResult.Result;
            if (size == -1) {
                m_Eof = true;
                return null; 
            }
            else if (size == 0) { 
                //empty frame 
                return new byte[0];
            } 

            return workerResult.Buffer;
        }
 
        //
        // 
        // 
        //
        public void WriteMessage(byte[] message) { 
            if (message == null) {
                throw new ArgumentNullException("message");
            }
            m_WriteHeader.PayloadSize = message.Length; 
            m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0);
 
            if (m_NetworkStream != null && message.Length != 0) { 
                BufferOffsetSize[] buffers = new BufferOffsetSize[2];
                buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); 
                buffers[1] = new BufferOffsetSize(message, 0, message.Length, false);
                m_NetworkStream.MultipleWrite(buffers);
            }
            else { 
                Transport.Write(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length);
                if (message.Length==0) { 
                    return; 
                }
                Transport.Write(message, 0, message.Length); 
            }
        }

        // 
        //
        // 
        // 
        public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallback, object stateObject) {
            if (message == null) { 
                throw new ArgumentNullException("message");
            }

            m_WriteHeader.PayloadSize = message.Length; 
            m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0);
 
            if (m_NetworkStream != null && message.Length != 0) { 
                BufferOffsetSize[] buffers = new BufferOffsetSize[2];
                buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); 
                buffers[1] = new BufferOffsetSize(message, 0, message.Length, false);
                return m_NetworkStream.BeginMultipleWrite(buffers, asyncCallback, stateObject);
            }
 
            if (message.Length == 0) {
                return Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, 
                                                   asyncCallback, stateObject); 
            }
            //Will need two async writes 
            // Prepare the second
            WorkerAsyncResult workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback,
                                                                   message, 0, message.Length);
            // Charge the first 
            IAsyncResult result = Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length,
                                 m_BeginWriteCallback, workerResult); 
            if (result.CompletedSynchronously) 
            {
                BeginWriteComplete(result); 
            }

            return workerResult;
        } 

        private void BeginWriteCallback(IAsyncResult transportResult) { 
            GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::BeginWriteCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.AsyncState.GetType().FullName); 
            if (transportResult.CompletedSynchronously)
            { 
                return;
            }

            WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; 

            try 
            { 
                BeginWriteComplete(transportResult);
            } 
            catch (Exception e)
            {
                if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException)
                { 
                    throw;
                } 
 
                workerResult.InvokeCallback(e);
            } 
        }

        // IO COMPLETION CALLBACK
        // 
        // Called when user IO request was wrapped to do several underlined IO
        // 
        private void BeginWriteComplete(IAsyncResult transportResult) 
        {
            do 
            {
                WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState;

                //First, complete the previous portion write 
                Transport.EndWrite(transportResult);
                //Check on exit criterion 
                if (workerResult.Offset == workerResult.End) { 
                    workerResult.InvokeCallback();
                    return; 
                }
                //setup exit criterion
                workerResult.Offset = workerResult.End;
                //Write next portion (frame body) using Async IO 
                transportResult = Transport.BeginWrite(workerResult.Buffer, 0, workerResult.End,
                                            m_BeginWriteCallback, workerResult); 
            } 
            while (transportResult.CompletedSynchronously);
        } 

        public void EndWriteMessage(IAsyncResult asyncResult) {
            if (asyncResult==null) {
                throw new ArgumentNullException("asyncResult"); 
            }
 
            WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; 

            if (workerResult != null) { 
                if (!workerResult.InternalPeekCompleted) {
                    workerResult.InternalWaitForCompletion();
                }
                if (workerResult.Result is Exception) { 
                    throw (Exception)(workerResult.Result);
                } 
            } 
            else {
                Transport.EndWrite(asyncResult); 
            }
        }

        /* 
        // Consider removing.
        public void Close() { 
            Transport.Close(); 
        }
        */ 
    }

    //
    // This class wraps an Async IO request 
    // It is based on our internal LazyAsyncResult helper
    // - If ParentResult is not null then the base class (LazyAsyncResult) methods must not be used 
    // 
    // - If ParentResult == null, then real user IO request is wrapped
    // 

    /*
    // Consider removing.
    internal delegate void WorkerCallback(WorkerAsyncResult result); 
    */
 
    internal class WorkerAsyncResult : LazyAsyncResult { 
        public byte[]   Buffer;
        public int      Offset; 
        public int      End;
        public bool     IsWrite;
        public WorkerAsyncResult ParentResult;
        /* 
        // Consider removing.
        public WorkerCallback StepDoneCallback; 
        */ 
        public bool     HeaderDone; // This migth be reworked so we read both header and frame in one chunk
        public bool     HandshakeDone; 

        public WorkerAsyncResult(object asyncObject, object asyncState,
                                   AsyncCallback savedAsyncCallback,
                                   byte[] buffer, int offset, int end) 
            : base( asyncObject, asyncState, savedAsyncCallback) {
 
                Buffer      = buffer; 
                Offset      = offset;
                End     = end; 
        }

        /*
        // Consider removing. 
        public WorkerAsyncResult(WorkerAsyncResult parentResult, byte[] buffer, int offset, int end)
             : base(null, null, null) { 
 
                ParentResult = parentResult;
                Buffer      = buffer; 
                Offset      = offset;
                End         = end;
        }
        */ 
    }
 
    // This guy describes the header used in framing of the stream data. 
    internal class FrameHeader {
        public const int IgnoreValue    = -1; 
        public const int HandshakeDoneId= 20;
        public const int HandshakeErrId = 21;
        public const int HandshakeId    = 22;
        public const int DefaultMajorV  = 1; 
        public const int DefaultMinorV  = 0;
 
 
        private int     _MessageId;
        private int     _MajorV; 
        private int     _MinorV;
        private int     _PayloadSize;

        public FrameHeader () { 
            _MessageId = HandshakeId;
            _MajorV    = DefaultMajorV; 
            _MinorV    = DefaultMinorV; 
            _PayloadSize = -1;
 
        }

        public FrameHeader (int messageId, int majorV, int minorV) {
            _MessageId = messageId; 
            _MajorV    = majorV;
            _MinorV    = minorV; 
            _PayloadSize = -1; 
        }
 
        /*
        // Consider removing.
        public FrameHeader Clone() {
            return new FrameHeader(_MessageId, _MajorV, _MinorV); 
        }
        */ 
 
        public int Size {
            get { 
                return 5;
            }
        }
 
        public int MaxMessageSize {
            get { 
                return 0xFFFF; 
            }
        } 

        public  int     MessageId {
            get {
                return  _MessageId; 
            }
            set { 
                _MessageId = value; 
            }
        } 

        public  int     MajorV {
            get {
                return  _MajorV; 
            }
        } 
 
        public  int     MinorV {
            get { 
                return  _MinorV;
            }
        }
 
        public int      PayloadSize {
            get { 
                return  _PayloadSize; 
            }
            set { 
                if (value > MaxMessageSize) {
                    throw new ArgumentException(SR.GetString(SR.net_frame_max_size,
                        MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo),
                        value.ToString(NumberFormatInfo.InvariantInfo)), "PayloadSize"); 
                }
                _PayloadSize = value; 
            } 
        }
 
        public void CopyTo(byte[] dest, int start) {
            dest[start++] = (byte)_MessageId;
            dest[start++] = (byte)_MajorV;
            dest[start++] = (byte)_MinorV; 
            dest[start++] = (byte)((_PayloadSize >> 8) & 0xFF);
            dest[start]   = (byte)(_PayloadSize & 0xFF); 
 
        }
 
        public void CopyFrom(byte[] bytes, int start, FrameHeader verifier) {
            _MessageId      = bytes[start++];
            _MajorV         = bytes[start++];
            _MinorV         = bytes[start++]; 
            _PayloadSize    = (int) ((bytes[start++]<<8) | bytes[start]);
 
            if (verifier.MessageId != FrameHeader.IgnoreValue && MessageId != verifier.MessageId) { 
                throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MessageId", MessageId, verifier.MessageId));
            } 

            if (verifier.MajorV != FrameHeader.IgnoreValue && MajorV != verifier.MajorV) {
                throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MajorV", MajorV, verifier.MajorV));
            } 

            if (verifier.MinorV != FrameHeader.IgnoreValue && MinorV != verifier.MinorV) { 
                throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MinorV", MinorV, verifier.MinorV)); 
            }
 
        }
    }
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK