Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / SingletonConnectionReader.cs / 1 / SingletonConnectionReader.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.ServiceModel; using System.ServiceModel.Activation; using System.ServiceModel.Dispatcher; using System.ServiceModel.Description; using System.IO; using System.IdentityModel.Claims; using System.IdentityModel.Policy; using System.ServiceModel.Security; using System.Threading; using System.ServiceModel.Diagnostics; using System.Xml; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Net; delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader); delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader); interface ISingletonChannelListener { TimeSpan ReceiveTimeout { get; } void ReceiveRequest(RequestContext requestContext, ItemDequeuedCallback callback, bool canDispatchOnThisThread); } class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader { ServerSingletonDecoder decoder; ServerSingletonPreambleCallback callback; WaitCallback onAsyncReadComplete; IConnectionOrientedTransportFactorySettings transportSettings; TransportSettingsCallback transportSettingsCallback; SecurityMessageProperty security; Uri via; IConnection rawConnection; byte[] connectionBuffer; bool isReadPending; int offset; int size; TimeoutHelper receiveTimeoutHelper; OnViaDelegate viaDelegate; public ServerSingletonPreambleConnectionReader(IConnection connection, ItemDequeuedCallback connectionDequeuedCallback, long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback, ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback) : base(connection, closedCallback) { this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize); this.offset = offset; this.size = size; this.callback = callback; this.transportSettingsCallback = transportSettingsCallback; this.rawConnection = connection; this.ConnectionDequeuedCallback = connectionDequeuedCallback; } public int BufferOffset { get { return this.offset; } } public int BufferSize { get { return this.size; } } public ServerSingletonDecoder Decoder { get { return this.decoder; } } public IConnection RawConnection { get { return this.rawConnection; } } public Uri Via { get { return this.via; } } public IConnectionOrientedTransportFactorySettings TransportSettings { get { return this.transportSettings; } } public SecurityMessageProperty Security { get { return this.security; } } TimeSpan GetRemainingTimeout() { return this.receiveTimeoutHelper.RemainingTime(); } void ReadAndDispatch() { bool success = false; try { while ((size > 0 || !isReadPending) && !IsClosed) { if (size == 0) { isReadPending = true; if (onAsyncReadComplete == null) { onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete); } if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(), onAsyncReadComplete, null) == AsyncReadResult.Queued) { break; } HandleReadComplete(); } int bytesRead = decoder.Decode(connectionBuffer, offset, size); if (bytesRead > 0) { offset += bytesRead; size -= bytesRead; } if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart) { this.via = decoder.Via; if (!Connection.Validate(via)) { // This goes through the failure path (Abort) even though it doesn't throw. return; } if (viaDelegate != null) { try { viaDelegate(via); } catch (ServiceActivationException e) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } // return fault and close connection SendFault(FramingEncodingString.ServiceActivationFailedFault); break; } } this.transportSettings = transportSettingsCallback(via); if (transportSettings == null) { EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via)); if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information); } SendFault(FramingEncodingString.EndpointNotFoundFault); return; } // we have enough information to hand off to a channel. Our job is done callback(this); break; } } success = true; } catch (CommunicationException exception) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information); } } catch (TimeoutException exception) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information); } } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } if (!ExceptionHandler.HandleTransportExceptionHelper(e)) { throw; } // containment -- we abort ourselves for any error, no extra containment needed } finally { if (!success) { Abort(); } } } public void SendFault(string faultString) { SendFault(faultString, ref this.receiveTimeoutHelper); } void SendFault(string faultString, ref TimeoutHelper timeoutHelper) { InitialServerConnectionReader.SendFault(Connection, faultString, connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize); } // finish preamble (upgrade and ACK) public IConnection CompletePreamble(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); if (!transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(decoder.ContentType)) { SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString( SR.ContentTypeMismatch, decoder.ContentType, transportSettings.MessageEncoderFactory.Encoder.ContentType))); } StreamUpgradeAcceptor upgradeAcceptor = null; StreamUpgradeProvider upgrade = transportSettings.Upgrade; if (upgrade != null) { upgradeAcceptor = upgrade.CreateUpgradeAcceptor(); } IConnection currentConnection = this.Connection; for (; ; ) { if (size == 0) { offset = 0; size = currentConnection.Read(connectionBuffer, 0, connectionBuffer.Length, timeoutHelper.RemainingTime()); if (size == 0) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException()); } } for (; ; ) { int bytesRead = decoder.Decode(connectionBuffer, offset, size); if (bytesRead > 0) { offset += bytesRead; size -= bytesRead; } switch (decoder.CurrentState) { case ServerSingletonDecoder.State.UpgradeRequest: if (upgradeAcceptor == null) { SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, decoder.Upgrade))); } if (!upgradeAcceptor.CanUpgrade(decoder.Upgrade)) { SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, decoder.Upgrade))); } // accept upgrade currentConnection.Write(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length, true, timeoutHelper.RemainingTime()); IConnection connectionToUpgrade = currentConnection; if (this.size > 0) { connectionToUpgrade = new PreReadConnection(connectionToUpgrade, this.connectionBuffer, this.offset, this.size); } try { currentConnection = InitialServerConnectionReader.UpgradeConnection(connectionToUpgrade, upgradeAcceptor, transportSettings); connectionBuffer = currentConnection.AsyncReadBuffer; } #pragma warning suppress 56500 catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) throw; // Audit Authentication Failure WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception); throw; } break; case ServerSingletonDecoder.State.Start: SetupSecurityIfNecessary(upgradeAcceptor); // we've finished the preamble. Ack and return. currentConnection.Write(ServerSessionEncoder.AckResponseBytes, 0, ServerSessionEncoder.AckResponseBytes.Length, true, timeoutHelper.RemainingTime()); return currentConnection; } if (size == 0) { break; } } } } void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor) { StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor; if (securityUpgradeAcceptor != null) { this.security = securityUpgradeAcceptor.GetRemoteSecurity(); if (this.security == null) { Exception securityFailedException = new ProtocolException( SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via)); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException); } // Audit Authentication Success WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null); } } #region Transport Security Auditing void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception) { try { WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception); } #pragma warning suppress 56500 // covered by FxCop catch (Exception auditException) { if (DiagnosticUtility.IsFatal(auditException)) { throw; } DiagnosticUtility.ExceptionUtility.TraceHandledException(auditException, TraceEventType.Error); } } void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception) { if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel) { return; } if (securityUpgradeAcceptor == null) { return; } String primaryIdentity = String.Empty; SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity(); if (clientSecurity != null) { primaryIdentity = GetIdentityNameFromContext(clientSecurity); } ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior; if (auditLevel == AuditLevel.Success) { SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation, auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity); } else { SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation, auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception); } } [MethodImpl(MethodImplOptions.NoInlining)] static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity) { return SecurityUtils.GetIdentityNamesFromContext( clientSecurity.ServiceSecurityContext.AuthorizationContext); } #endregion void HandleReadComplete() { offset = 0; size = Connection.EndRead(); isReadPending = false; if (size == 0) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException()); } } void OnAsyncReadComplete(object state) { bool success = false; try { HandleReadComplete(); ReadAndDispatch(); success = true; } catch (CommunicationException exception) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information); } } catch (TimeoutException exception) { if (DiagnosticUtility.ShouldTraceInformation) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information); } } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } if (!ExceptionHandler.HandleTransportExceptionHelper(e)) { throw; } // containment -- we abort ourselves for any error, no extra containment needed } finally { if (!success) { Abort(); } } } public void StartReading(OnViaDelegate viaDelegate, TimeSpan timeout) { this.viaDelegate = viaDelegate; this.receiveTimeoutHelper = new TimeoutHelper(timeout); this.connectionBuffer = Connection.AsyncReadBuffer; ReadAndDispatch(); } } class ServerSingletonConnectionReader : SingletonConnectionReader { ConnectionDemuxer connectionDemuxer; ServerSingletonDecoder decoder; IConnection rawConnection; string contentType; public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader, IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer) : base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize, preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via) { this.decoder = preambleReader.Decoder; this.contentType = this.decoder.ContentType; this.connectionDemuxer = connectionDemuxer; this.rawConnection = preambleReader.RawConnection; } protected override string ContentType { get { return this.contentType; } } protected override long StreamPosition { get { return this.decoder.StreamPosition; } } protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof) { while (size > 0) { int bytesRead = decoder.Decode(buffer, offset, size); if (bytesRead > 0) { offset += bytesRead; size -= bytesRead; } switch (decoder.CurrentState) { case ServerSingletonDecoder.State.EnvelopeStart: // we're at the envelope return true; case ServerSingletonDecoder.State.End: isAtEof = true; return false; } } return false; } protected override void OnClose(TimeSpan timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); // send back EOF and then recycle the connection this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime()); this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime()); } protected override void PrepareMessage(Message message) { base.PrepareMessage(message); IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint; // pipes will return null if (remoteEndPoint != null) { RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint); message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty); } } } abstract class SingletonConnectionReader { IConnection connection; bool doneReceiving; bool doneSending; bool isAtEof; bool isClosed; SecurityMessageProperty security; object thisLock = new object(); int offset; int size; IConnectionOrientedTransportFactorySettings transportSettings; Uri via; Stream inputStream; protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security, IConnectionOrientedTransportFactorySettings transportSettings, Uri via) { this.connection = connection; this.offset = offset; this.size = size; this.security = security; this.transportSettings = transportSettings; this.via = via; } protected IConnection Connection { get { return this.connection; } } protected object ThisLock { get { return this.thisLock; } } protected virtual string ContentType { get { return null; } } protected abstract long StreamPosition { get; } public void Abort() { this.connection.Abort(); } public void DoneReceiving(bool atEof) { DoneReceiving(atEof, this.transportSettings.CloseTimeout); } void DoneReceiving(bool atEof, TimeSpan timeout) { if (!this.doneReceiving) { this.isAtEof = atEof; this.doneReceiving = true; if (this.doneSending) { this.Close(timeout); } } } public void Close(TimeSpan timeout) { lock (ThisLock) { if (this.isClosed) { return; } this.isClosed = true; } TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); bool success = false; try { // first drain our stream if necessary if (this.inputStream != null) { byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize); while (!this.isAtEof) { this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime()); int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length); if (bytesRead == 0) { this.isAtEof = true; } } } OnClose(timeoutHelper.RemainingTime()); success = true; } finally { if (!success) { this.Abort(); } } } protected abstract void OnClose(TimeSpan timeout); public void DoneSending(TimeSpan timeout) { this.doneSending = true; if (this.doneReceiving) { this.Close(timeout); } } protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof); protected virtual void PrepareMessage(Message message) { message.Properties.Via = this.via; message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null; } public RequestContext ReceiveRequest(TimeSpan timeout) { Message requestMessage = Receive(timeout); return new StreamedFramingRequestContext(this, requestMessage); } public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) { return new ReceiveAsyncResult(this, timeout, callback, state); } public virtual Message EndReceive(IAsyncResult result) { return ReceiveAsyncResult.End(result); } public Message Receive(TimeSpan timeout) { byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize); if (size > 0) { Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size); } TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); for (; ; ) { if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof)) { break; } if (this.isAtEof) { DoneReceiving(true, timeoutHelper.RemainingTime()); return null; } if (size == 0) { offset = 0; size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime()); if (size == 0) { DoneReceiving(true, timeoutHelper.RemainingTime()); return null; } } } // we're ready to read a message IConnection singletonConnection = this.connection; if (size > 0) { byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size); Buffer.BlockCopy(buffer, offset, initialData, 0, size); singletonConnection = new PreReadConnection(singletonConnection, initialData); } Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings); this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize); using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null) { if (DiagnosticUtility.ShouldUseActivity) { ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage); } Message message = null; try { message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage( this.inputStream, transportSettings.MaxBufferSize, this.ContentType); } catch (XmlException xmlException) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException)); } if (DiagnosticUtility.ShouldUseActivity) { TraceUtility.TransferFromTransport(message); } PrepareMessage(message); return message; } } class ReceiveAsyncResult : AsyncResult { static WaitCallback onReceiveScheduled = new WaitCallback(OnReceiveScheduled); Message message; SingletonConnectionReader parent; TimeSpan timeout; public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.parent = parent; this.timeout = timeout; // IOThreadScheduler.ScheduleCallback(onReceiveScheduled, this); } public static Message End(IAsyncResult result) { ReceiveAsyncResult receiveAsyncResult = AsyncResult.End(result); return receiveAsyncResult.message; } static void OnReceiveScheduled(object state) { ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state; Exception completionException = null; try { thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } completionException = exception; } thisPtr.Complete(false, completionException); } } class StreamedFramingRequestContext : RequestContextBase { IConnection connection; SingletonConnectionReader parent; IConnectionOrientedTransportFactorySettings settings; TimeoutHelper timeoutHelper; public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage) : base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout) { this.parent = parent; this.connection = parent.connection; this.settings = parent.transportSettings; } protected override void OnAbort() { this.parent.Abort(); } protected override void OnClose(TimeSpan timeout) { this.parent.Close(timeout); } protected override void OnReply(Message message, TimeSpan timeout) { timeoutHelper = new TimeoutHelper(timeout); StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper); parent.DoneSending(timeoutHelper.RemainingTime()); } protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state) { timeoutHelper = new TimeoutHelper(timeout); return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings, ref timeoutHelper, callback, state); } protected override void OnEndReply(IAsyncResult result) { StreamingConnectionHelper.EndWriteMessage(result); parent.DoneSending(timeoutHelper.RemainingTime()); } } // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers class SingletonInputConnectionStream : ConnectionStream { SingletonMessageDecoder decoder; SingletonConnectionReader reader; bool atEof; byte[] chunkBuffer; // used for when we have overflow int chunkBufferOffset; int chunkBufferSize; int chunkBytesRemaining; public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection, IDefaultCommunicationTimeouts defaultTimeouts) : base(connection, defaultTimeouts) { this.reader = reader; this.decoder = new SingletonMessageDecoder(reader.StreamPosition); this.chunkBytesRemaining = 0; this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize]; } void AbortReader() { this.reader.Abort(); } public override void Close() { this.reader.DoneReceiving(this.atEof); } // run chunk data through the decoder void DecodeData(byte[] buffer, int offset, int size) { while (size > 0) { int bytesRead = decoder.Decode(buffer, offset, size); offset += bytesRead; size -= bytesRead; DiagnosticUtility.DebugAssert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, ""); } } // run the current data through the decoder to get valid message bytes void DecodeSize(byte[] buffer, ref int offset, ref int size) { while (size > 0) { int bytesRead = decoder.Decode(buffer, offset, size); if (bytesRead > 0) { offset += bytesRead; size -= bytesRead; } switch (decoder.CurrentState) { case SingletonMessageDecoder.State.ChunkStart: this.chunkBytesRemaining = decoder.ChunkSize; // if we have overflow and we're not decoding out of our buffer, copy over if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer)) { DiagnosticUtility.DebugAssert(size <= this.chunkBuffer.Length, ""); Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size); this.chunkBufferOffset = 0; this.chunkBufferSize = size; } return; case SingletonMessageDecoder.State.End: ProcessEof(); return; } } } int ReadCore(byte[] buffer, int offset, int count) { int bytesRead = -1; try { bytesRead = base.Read(buffer, offset, count); if (bytesRead == 0) { ProcessEof(); } } finally { if (bytesRead == -1) // there was an exception { AbortReader(); } } return bytesRead; } public override int Read(byte[] buffer, int offset, int count) { int result = 0; while (true) { if (count == 0) { return result; } if (this.atEof) { return result; } // first deal with any residual carryover if (this.chunkBufferSize > 0) { int bytesToCopy = Math.Min(chunkBytesRemaining, Math.Min(this.chunkBufferSize, count)); Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy); // keep decoder up to date DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy); this.chunkBufferOffset += bytesToCopy; this.chunkBufferSize -= bytesToCopy; this.chunkBytesRemaining -= bytesToCopy; if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0) { DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize); } result += bytesToCopy; offset += bytesToCopy; count -= bytesToCopy; } else if (chunkBytesRemaining > 0) { // We're in the middle of a chunk. Try and include the next chunk size as well int bytesRead = ReadCore(buffer, offset, Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize)); // keep decoder up to date DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining)); if (bytesRead > chunkBytesRemaining) { result += this.chunkBytesRemaining; int overflowCount = bytesRead - chunkBytesRemaining; int overflowOffset = offset + chunkBytesRemaining; this.chunkBytesRemaining = 0; // read at least part of the next chunk, and put any overflow in this.chunkBuffer DecodeSize(buffer, ref overflowOffset, ref overflowCount); } else { result += bytesRead; this.chunkBytesRemaining -= bytesRead; } return result; } else { // Final case: we have a new chunk. Read the size, and loop around again if (count < IntEncoder.MaxEncodedSize) { // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer this.chunkBufferOffset = 0; this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length); DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize); } else { int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize); int sizeOffset = offset; DecodeSize(buffer, ref sizeOffset, ref bytesRead); } } } } void ProcessEof() { if (!this.atEof) { this.atEof = true; if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0 || decoder.CurrentState != SingletonMessageDecoder.State.End) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException()); } this.reader.DoneReceiving(true); } } public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { return new ReadAsyncResult(this, buffer, offset, count, callback, state); } public override int EndRead(IAsyncResult result) { return ReadAsyncResult.End(result); } public class ReadAsyncResult : AsyncResult { SingletonInputConnectionStream parent; int result; public ReadAsyncResult(SingletonInputConnectionStream parent, byte[] buffer, int offset, int count, AsyncCallback callback, object state) : base(callback, state) { this.parent = parent; // this.result = this.parent.Read(buffer, offset, count); base.Complete(true); } public static int End(IAsyncResult result) { ReadAsyncResult thisPtr = AsyncResult.End (result); return thisPtr.result; } } } } static class StreamingConnectionHelper { public static void WriteMessage(Message message, IConnection connection, bool isRequest, IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper) { byte[] endBytes = null; if (message != null) { MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder; byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes; bool writeStreamed; if (isRequest) { endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes; writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode); } else { endBytes = SingletonEncoder.EnvelopeEndBytes; writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode); } if (writeStreamed) { connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime()); Stream connectionStream = new StreamingOutputConnectionStream(connection, settings); Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper); messageEncoder.WriteMessage(message, writeTimeoutStream); } else { ArraySegment messageData = messageEncoder.WriteMessage(message, int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize); messageData = SingletonEncoder.EncodeMessageFrame(messageData); Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length, envelopeStartBytes.Length); connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length, messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager); } } else if (isRequest) // context handles response end bytes { endBytes = SingletonEncoder.EndBytes; } if (endBytes != null) { connection.Write(endBytes, 0, endBytes.Length, true, timeoutHelper.RemainingTime()); } } public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest, IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state) { return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state); } public static void EndWriteMessage(IAsyncResult result) { WriteMessageAsyncResult.End(result); } // overrides ConnectionStream to add a Framing int at the beginning of each record class StreamingOutputConnectionStream : ConnectionStream { byte[] encodedSize; public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts) : base(connection, timeouts) { this.encodedSize = new byte[IntEncoder.MaxEncodedSize]; } void WriteChunkSize(int size) { if (size > 0) { int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0); base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout)); } } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { WriteChunkSize(count); return base.BeginWrite(buffer, offset, count, callback, state); } public override void WriteByte(byte value) { WriteChunkSize(1); base.WriteByte(value); } public override void Write(byte[] buffer, int offset, int count) { WriteChunkSize(count); base.Write(buffer, offset, count); } } class WriteMessageAsyncResult : AsyncResult { IConnection connection; MessageEncoder encoder; BufferManager bufferManager; Message message; static AsyncCallback onWriteBufferedMessage; static AsyncCallback onWriteStartBytes; static WaitCallback onWriteStartBytesScheduled; static AsyncCallback onWriteEndBytes = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteEndBytes)); byte[] bufferToFree; IConnectionOrientedTransportFactorySettings settings; TimeoutHelper timeoutHelper; byte[] endBytes; public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest, IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper, AsyncCallback callback, object state) : base(callback, state) { this.connection = connection; this.encoder = settings.MessageEncoderFactory.Encoder; this.bufferManager = settings.BufferManager; this.timeoutHelper = timeoutHelper; this.message = message; this.settings = settings; bool throwing = true; bool completeSelf = false; if (message == null) { if (isRequest) // context takes care of the end bytes on Close/reader.EOF { this.endBytes = SingletonEncoder.EndBytes; } completeSelf = WriteEndBytes(); } else { try { byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes; bool writeStreamed; if (isRequest) { this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes; writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode); } else { this.endBytes = SingletonEncoder.EnvelopeEndBytes; writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode); } if (writeStreamed) { if (onWriteStartBytes == null) { onWriteStartBytes = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteStartBytes)); } IAsyncResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), onWriteStartBytes, this); if (writeStartBytesResult.CompletedSynchronously) { if (onWriteStartBytesScheduled == null) { onWriteStartBytesScheduled = new WaitCallback(OnWriteStartBytesScheduled); } IOThreadScheduler.ScheduleCallback(onWriteStartBytesScheduled, writeStartBytesResult); } } else { ArraySegment messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message, int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize); messageData = SingletonEncoder.EncodeMessageFrame(messageData); this.bufferToFree = messageData.Array; Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length, envelopeStartBytes.Length); if (onWriteBufferedMessage == null) { onWriteBufferedMessage = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteBufferedMessage)); } IAsyncResult writeBufferedResult = connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length, messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), onWriteBufferedMessage, this); if (writeBufferedResult.CompletedSynchronously) { completeSelf = HandleWriteBufferedMessage(writeBufferedResult); } } throwing = false; } finally { if (throwing) { Cleanup(); } } } if (completeSelf) { base.Complete(true); } } public static void End(IAsyncResult result) { AsyncResult.End (result); } void Cleanup() { if (bufferToFree != null) { this.bufferManager.ReturnBuffer(bufferToFree); } } bool HandleWriteStartBytes(IAsyncResult result) { connection.EndWrite(result); Stream connectionStream = new StreamingOutputConnectionStream(connection, settings); Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper); this.encoder.WriteMessage(message, writeTimeoutStream); return WriteEndBytes(); } bool HandleWriteBufferedMessage(IAsyncResult result) { this.connection.EndWrite(result); return WriteEndBytes(); } bool WriteEndBytes() { if (this.endBytes == null) { Cleanup(); return true; } IAsyncResult result = connection.BeginWrite(endBytes, 0, endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this); if (!result.CompletedSynchronously) { return false; } return HandleWriteEndBytes(result); } bool HandleWriteEndBytes(IAsyncResult result) { this.connection.EndWrite(result); Cleanup(); return true; } static void OnWriteStartBytes(IAsyncResult result) { if (result.CompletedSynchronously) { return; } OnWriteStartBytesCallbackHelper(result); } static void OnWriteStartBytesScheduled(object state) { OnWriteStartBytesCallbackHelper((IAsyncResult) state); } static void OnWriteStartBytesCallbackHelper(IAsyncResult result) { WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState; Exception completionException = null; bool completeSelf = false; bool throwing = true; try { completeSelf = thisPtr.HandleWriteStartBytes(result); throwing = false; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (throwing) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnWriteBufferedMessage(IAsyncResult result) { if (result.CompletedSynchronously) { return; } WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState; Exception completionException = null; bool completeSelf = false; bool throwing = true; try { completeSelf = thisPtr.HandleWriteBufferedMessage(result); throwing = false; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (throwing) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } static void OnWriteEndBytes(IAsyncResult result) { if (result.CompletedSynchronously) { return; } WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState; Exception completionException = null; bool completeSelf = false; bool success = false; try { completeSelf = thisPtr.HandleWriteEndBytes(result); success = true; } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } completeSelf = true; completionException = e; } finally { if (!success) { thisPtr.Cleanup(); } } if (completeSelf) { thisPtr.Complete(false, completionException); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DoubleMinMaxAggregationOperator.cs
- DetailsViewDeleteEventArgs.cs
- EntityDesignerDataSourceView.cs
- EmptyStringExpandableObjectConverter.cs
- ScrollProperties.cs
- XmlDocumentFragment.cs
- DataTableReaderListener.cs
- TypeSemantics.cs
- DataPagerFieldCommandEventArgs.cs
- ImageButton.cs
- SecurityKeyIdentifier.cs
- EditorZoneBase.cs
- CriticalFinalizerObject.cs
- DbExpressionVisitor_TResultType.cs
- CompilerParameters.cs
- TemplatedWizardStep.cs
- OdbcFactory.cs
- ComplexPropertyEntry.cs
- BevelBitmapEffect.cs
- PolicyChain.cs
- DataSetViewSchema.cs
- MatrixAnimationBase.cs
- ArraySegment.cs
- MimeWriter.cs
- XmlTextReaderImplHelpers.cs
- QfeChecker.cs
- ClonableStack.cs
- MethodAccessException.cs
- DefaultProxySection.cs
- PeerContact.cs
- KeySpline.cs
- MobileUserControlDesigner.cs
- HtmlControlPersistable.cs
- ListGeneralPage.cs
- ScrollBarAutomationPeer.cs
- ExtentJoinTreeNode.cs
- CheckBoxRenderer.cs
- ExpressionVisitor.cs
- NativeMethods.cs
- EventSinkHelperWriter.cs
- ParsedAttributeCollection.cs
- ColumnReorderedEventArgs.cs
- BamlLocalizableResourceKey.cs
- GridViewActionList.cs
- SqlInfoMessageEvent.cs
- TextServicesDisplayAttributePropertyRanges.cs
- RawKeyboardInputReport.cs
- RoutedEventValueSerializer.cs
- DnsPermission.cs
- ScriptRef.cs
- ProcessMessagesAsyncResult.cs
- PictureBox.cs
- SendActivityDesignerTheme.cs
- PenContext.cs
- KeysConverter.cs
- coordinatorfactory.cs
- RoleManagerModule.cs
- BitmapCacheBrush.cs
- StsCommunicationException.cs
- DateTimeOffsetConverter.cs
- ReadWriteSpinLock.cs
- TemplateControlCodeDomTreeGenerator.cs
- ListBoxItem.cs
- HttpListenerRequestTraceRecord.cs
- _NegoStream.cs
- WebPartRestoreVerb.cs
- Transform.cs
- Container.cs
- EditorPartDesigner.cs
- OrderPreservingPipeliningSpoolingTask.cs
- InvokeMethodActivity.cs
- PartitionedStreamMerger.cs
- EtwTrace.cs
- StrongNameKeyPair.cs
- NextPreviousPagerField.cs
- PrivilegeNotHeldException.cs
- MarkupExtensionParser.cs
- XmlToDatasetMap.cs
- QueryValue.cs
- SafeHandles.cs
- KnowledgeBase.cs
- TextBoxAutomationPeer.cs
- QilValidationVisitor.cs
- LocationEnvironment.cs
- SqlFunctionAttribute.cs
- WindowsProgressbar.cs
- SqlXmlStorage.cs
- smtppermission.cs
- AsyncResult.cs
- DataAdapter.cs
- EllipseGeometry.cs
- WindowsFormsDesignerOptionService.cs
- OracleBoolean.cs
- ErrorEventArgs.cs
- RichTextBoxConstants.cs
- EdmSchemaError.cs
- MDIControlStrip.cs
- DecimalAnimationUsingKeyFrames.cs
- ErrorStyle.cs
- GridViewRowEventArgs.cs