Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / BufferedOutputAsyncStream.cs / 1 / BufferedOutputAsyncStream.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Diagnostics; using System.Globalization; using System.IO; using System.ServiceModel; using System.Collections; using System.Collections.Generic; using System.Threading; ////// /// BufferedOutputAsyncStream is used for writing streamed response. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without /// waiting for all outstanding BeginWrites to complete. /// /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream /// 1. allow concurrent IO (for multiple BeginWrite calls) /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. /// /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements. /// /// class BufferedOutputAsyncStream : Stream { Stream stream; int bufferSize; int bufferLimit; Listbuffers; int currentIndex; internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit) { this.stream = stream; this.bufferSize = bufferSize; this.bufferLimit = bufferLimit; this.buffers = new List (); this.buffers.Add(new ByteBuffer(this.bufferSize, stream)); this.currentIndex = 0; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return stream.CanWrite; } } ByteBuffer CurrentBuffer { get { return buffers[currentIndex]; } } public override long Length { get { #pragma warning suppress 56503 // [....], required by the Stream.Length contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } } public override long Position { get { #pragma warning suppress 56503 // [....], required by the Stream.Position contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } set { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } } public override void Close() { CurrentBuffer.Flush(); stream.Close(); // Complete all outstanding writes WaitForAllWritesToComplete(); } public override void Flush() { CurrentBuffer.Flush(); stream.Flush(); } void NextBuffer() { currentIndex++; if (currentIndex == buffers.Count) { if (buffers.Count < bufferLimit) { // allocate new buffer buffers.Add(new ByteBuffer(bufferSize, stream)); return; } currentIndex = 0; } DiagnosticUtility.DebugAssert(currentIndex >= 0 && currentIndex < buffers.Count, string.Format(CultureInfo.InvariantCulture, "The value ({0}) must be greater than or equal to zero and less than {1}", currentIndex, buffers.Count)); // Make Sure that the buffer is ready to receive data CurrentBuffer.WaitForWriteComplete(); } public override int Read(byte[] buffer, int offset, int count) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override int ReadByte() { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override long Seek(long offset, SeekOrigin origin) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } public override void SetLength(long value) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } void WaitForAllWritesToComplete() { // Complete all outstanding writes for (int i = 0; i < buffers.Count; i++) { buffers[i].WaitForWriteComplete(); } } public override void Write(byte[] buffer, int offset, int count) { while (count > 0) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } int freeBytes = CurrentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; CurrentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } } public override void WriteByte(byte value) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } CurrentBuffer.CopyData(value); if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } class ByteBuffer { byte[] bytes; int position = 0; Stream stream; static AsyncCallback writeCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(WriteCallback)); bool writePending = false; bool waiting = false; Exception completionException; internal ByteBuffer(int bufferSize, Stream stream) { this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); this.stream = stream; } internal bool IsWritePending { get { return writePending; } } object ThisLock { get { return this; } } internal int FreeBytes { get { return this.bytes.Length - this.position; } } internal void Flush() { if (this.position <= 0) return; int bytesToWrite = this.position; this.writePending = true; this.position = 0; IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, writeCallback, this); if (asyncResult.CompletedSynchronously) { this.stream.EndWrite(asyncResult); this.writePending = false; } } static void WriteCallback(IAsyncResult result) { if (result.CompletedSynchronously) return; // Fetch our state information: ByteBuffer ByteBuffer buffer = (ByteBuffer)result.AsyncState; try { buffer.stream.EndWrite(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } buffer.completionException = e; } // Tell the main thread we've finished. lock (buffer.ThisLock) { buffer.writePending = false; // Do not Pulse if no one is waiting, to avoid the overhead of Pulse if (!buffer.waiting) return; Monitor.Pulse(buffer.ThisLock); } } internal void WaitForWriteComplete() { lock (ThisLock) { if (this.writePending) { // Wait until the async write of this buffer is finished. this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } // Raise exception if necessary if (this.completionException != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); } } internal void CopyData(byte[] buffer, int offset, int count) { DiagnosticUtility.DebugAssert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position)); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); this.position += count; } internal void CopyData(byte value) { DiagnosticUtility.DebugAssert(this.position < this.bytes.Length, "Buffer is full"); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); this.bytes[this.position++] = value; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- counter.cs
- InvariantComparer.cs
- DecoderFallback.cs
- WebSysDescriptionAttribute.cs
- HttpPostedFileWrapper.cs
- Translator.cs
- MimeWriter.cs
- BooleanAnimationBase.cs
- MessageEncoderFactory.cs
- Polyline.cs
- safemediahandle.cs
- EventBuilder.cs
- OleDbParameterCollection.cs
- InputLanguage.cs
- ObjectStateFormatter.cs
- OleDbTransaction.cs
- IISMapPath.cs
- SimpleRecyclingCache.cs
- Dispatcher.cs
- CommonXSendMessage.cs
- DrawingImage.cs
- SystemTcpConnection.cs
- WebBrowserProgressChangedEventHandler.cs
- WebPermission.cs
- ObjectDataSource.cs
- MDIWindowDialog.cs
- InvokePattern.cs
- ClientSession.cs
- MatrixUtil.cs
- DesigntimeLicenseContextSerializer.cs
- OleDbReferenceCollection.cs
- DataBindingCollectionConverter.cs
- XmlDocument.cs
- SchemaMapping.cs
- NativeMethodsOther.cs
- UnsafeNativeMethods.cs
- ControlEvent.cs
- MediaContext.cs
- ColorConvertedBitmap.cs
- SiteMapProvider.cs
- SwitchAttribute.cs
- EntityType.cs
- GeometryHitTestParameters.cs
- _FtpControlStream.cs
- ParameterElementCollection.cs
- ErrorStyle.cs
- BrowserCapabilitiesFactoryBase.cs
- QilIterator.cs
- PackageDigitalSignatureManager.cs
- SingleKeyFrameCollection.cs
- ProcessModuleCollection.cs
- XamlReaderConstants.cs
- ButtonBaseAutomationPeer.cs
- ConfigXmlText.cs
- DataGridViewTopLeftHeaderCell.cs
- LocatorGroup.cs
- CellParaClient.cs
- WebConvert.cs
- InfoCardAsymmetricCrypto.cs
- PropertyKey.cs
- SQLSingleStorage.cs
- OleDbSchemaGuid.cs
- XmlTextWriter.cs
- ResourceAttributes.cs
- DataGridViewSelectedColumnCollection.cs
- URL.cs
- UMPAttributes.cs
- GeometryGroup.cs
- AuthorizationRule.cs
- XsltOutput.cs
- DayRenderEvent.cs
- TagNameToTypeMapper.cs
- StringResourceManager.cs
- TransformConverter.cs
- DataGridViewSelectedCellCollection.cs
- MouseActionValueSerializer.cs
- ToolBar.cs
- Converter.cs
- CodeFieldReferenceExpression.cs
- MappingItemCollection.cs
- ColorConvertedBitmap.cs
- BackgroundWorker.cs
- FactoryGenerator.cs
- HuffmanTree.cs
- ExceptionWrapper.cs
- DiagnosticTrace.cs
- SessionState.cs
- Storyboard.cs
- BadImageFormatException.cs
- SoapSchemaMember.cs
- OracleCommand.cs
- HttpConfigurationSystem.cs
- ResolvedKeyFrameEntry.cs
- XmlDownloadManager.cs
- HandoffBehavior.cs
- FamilyTypeface.cs
- PeerTransportCredentialType.cs
- typedescriptorpermissionattribute.cs
- TreeNodeStyleCollection.cs
- KnownBoxes.cs