Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / BufferedOutputAsyncStream.cs / 1 / BufferedOutputAsyncStream.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Diagnostics; using System.Globalization; using System.IO; using System.ServiceModel; using System.Collections; using System.Collections.Generic; using System.Threading; ////// /// BufferedOutputAsyncStream is used for writing streamed response. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without /// waiting for all outstanding BeginWrites to complete. /// /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream /// 1. allow concurrent IO (for multiple BeginWrite calls) /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. /// /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements. /// /// class BufferedOutputAsyncStream : Stream { Stream stream; int bufferSize; int bufferLimit; Listbuffers; int currentIndex; internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit) { this.stream = stream; this.bufferSize = bufferSize; this.bufferLimit = bufferLimit; this.buffers = new List (); this.buffers.Add(new ByteBuffer(this.bufferSize, stream)); this.currentIndex = 0; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return stream.CanWrite; } } ByteBuffer CurrentBuffer { get { return buffers[currentIndex]; } } public override long Length { get { #pragma warning suppress 56503 // [....], required by the Stream.Length contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } } public override long Position { get { #pragma warning suppress 56503 // [....], required by the Stream.Position contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } set { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } } public override void Close() { CurrentBuffer.Flush(); stream.Close(); // Complete all outstanding writes WaitForAllWritesToComplete(); } public override void Flush() { CurrentBuffer.Flush(); stream.Flush(); } void NextBuffer() { currentIndex++; if (currentIndex == buffers.Count) { if (buffers.Count < bufferLimit) { // allocate new buffer buffers.Add(new ByteBuffer(bufferSize, stream)); return; } currentIndex = 0; } DiagnosticUtility.DebugAssert(currentIndex >= 0 && currentIndex < buffers.Count, string.Format(CultureInfo.InvariantCulture, "The value ({0}) must be greater than or equal to zero and less than {1}", currentIndex, buffers.Count)); // Make Sure that the buffer is ready to receive data CurrentBuffer.WaitForWriteComplete(); } public override int Read(byte[] buffer, int offset, int count) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override int ReadByte() { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override long Seek(long offset, SeekOrigin origin) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } public override void SetLength(long value) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } void WaitForAllWritesToComplete() { // Complete all outstanding writes for (int i = 0; i < buffers.Count; i++) { buffers[i].WaitForWriteComplete(); } } public override void Write(byte[] buffer, int offset, int count) { while (count > 0) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } int freeBytes = CurrentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; CurrentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } } public override void WriteByte(byte value) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } CurrentBuffer.CopyData(value); if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } class ByteBuffer { byte[] bytes; int position = 0; Stream stream; static AsyncCallback writeCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(WriteCallback)); bool writePending = false; bool waiting = false; Exception completionException; internal ByteBuffer(int bufferSize, Stream stream) { this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); this.stream = stream; } internal bool IsWritePending { get { return writePending; } } object ThisLock { get { return this; } } internal int FreeBytes { get { return this.bytes.Length - this.position; } } internal void Flush() { if (this.position <= 0) return; int bytesToWrite = this.position; this.writePending = true; this.position = 0; IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, writeCallback, this); if (asyncResult.CompletedSynchronously) { this.stream.EndWrite(asyncResult); this.writePending = false; } } static void WriteCallback(IAsyncResult result) { if (result.CompletedSynchronously) return; // Fetch our state information: ByteBuffer ByteBuffer buffer = (ByteBuffer)result.AsyncState; try { buffer.stream.EndWrite(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } buffer.completionException = e; } // Tell the main thread we've finished. lock (buffer.ThisLock) { buffer.writePending = false; // Do not Pulse if no one is waiting, to avoid the overhead of Pulse if (!buffer.waiting) return; Monitor.Pulse(buffer.ThisLock); } } internal void WaitForWriteComplete() { lock (ThisLock) { if (this.writePending) { // Wait until the async write of this buffer is finished. this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } // Raise exception if necessary if (this.completionException != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); } } internal void CopyData(byte[] buffer, int offset, int count) { DiagnosticUtility.DebugAssert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position)); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); this.position += count; } internal void CopyData(byte value) { DiagnosticUtility.DebugAssert(this.position < this.bytes.Length, "Buffer is full"); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); this.bytes[this.position++] = value; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- unsafeIndexingFilterStream.cs
- ContextDataSourceContextData.cs
- WindowsPrincipal.cs
- Int16.cs
- MenuScrollingVisibilityConverter.cs
- ImportCatalogPart.cs
- HebrewCalendar.cs
- CapabilitiesRule.cs
- BlurBitmapEffect.cs
- TypeToStringValueConverter.cs
- Sql8ConformanceChecker.cs
- SafeSecurityHandles.cs
- SystemInfo.cs
- TypeSource.cs
- TextSelection.cs
- Expressions.cs
- TemplateNameScope.cs
- StateManagedCollection.cs
- EntityDataSource.cs
- HMACSHA512.cs
- NativeMethods.cs
- ToolStripMenuItem.cs
- ToolBarButton.cs
- securitycriticaldataformultiplegetandset.cs
- NativeMethods.cs
- MarshalByRefObject.cs
- CrossContextChannel.cs
- EntitySqlQueryState.cs
- DoubleUtil.cs
- WsiProfilesElementCollection.cs
- WorkflowElementDialog.cs
- ListViewTableCell.cs
- DataGridViewDataErrorEventArgs.cs
- ListView.cs
- OracleTransaction.cs
- SettingsAttributes.cs
- WmlFormAdapter.cs
- ExtensibleClassFactory.cs
- ImageField.cs
- QilVisitor.cs
- DataServiceHostFactory.cs
- GridViewRowEventArgs.cs
- PolicyAssertionCollection.cs
- MobileRedirect.cs
- ErrorLog.cs
- TextCompositionManager.cs
- FigureParagraph.cs
- Choices.cs
- StreamResourceInfo.cs
- TypeNameConverter.cs
- XmlAttributeOverrides.cs
- RepeatButton.cs
- TextEncodedRawTextWriter.cs
- FilterQuery.cs
- UpdatableWrapper.cs
- AddInPipelineAttributes.cs
- EventRouteFactory.cs
- DefinitionBase.cs
- LogicalTreeHelper.cs
- ContractMapping.cs
- HostingEnvironmentException.cs
- WebRequest.cs
- XMLUtil.cs
- ConstructorBuilder.cs
- PageStatePersister.cs
- HtmlInputControl.cs
- InstanceKeyNotReadyException.cs
- DbConnectionStringCommon.cs
- Invariant.cs
- oledbmetadatacollectionnames.cs
- DbParameterHelper.cs
- AutomationIdentifier.cs
- TextEditorCharacters.cs
- Evidence.cs
- MailWriter.cs
- PersonalizationEntry.cs
- FileUtil.cs
- DetailsViewDeleteEventArgs.cs
- HtmlControlAdapter.cs
- MessageRpc.cs
- Model3D.cs
- RoamingStoreFileUtility.cs
- XsdCachingReader.cs
- SplashScreenNativeMethods.cs
- MenuItem.cs
- ScrollBarAutomationPeer.cs
- NativeActivity.cs
- TreeNodeSelectionProcessor.cs
- EntitySqlQueryBuilder.cs
- AvTraceFormat.cs
- DataGridViewControlCollection.cs
- GridViewItemAutomationPeer.cs
- DataPagerField.cs
- TextTrailingCharacterEllipsis.cs
- XmlHierarchyData.cs
- TextContainerChangedEventArgs.cs
- _StreamFramer.cs
- HierarchicalDataSourceConverter.cs
- HtmlMobileTextWriter.cs
- EpmCustomContentSerializer.cs