diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 85dbc333ebe9d8..ed4f40b6cedbad 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -495,6 +495,7 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); + Debug.Assert(ResumeWriterThreshold >= 1, "ResumeWriterThreshold is less than 1"); if (oldLength >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index 998845f804dd33..a196f6029e97a7 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -63,7 +63,15 @@ public PipeOptions( { resumeWriterThreshold = DefaultResumeWriterThreshold; } - else if (resumeWriterThreshold < 0 || resumeWriterThreshold > pauseWriterThreshold) + else if (resumeWriterThreshold == 0) + { + // A resumeWriterThreshold of 0 makes no sense because the writer could never resume if paused. + // By setting it to 1, the writer will resume only after all data is consumed. + resumeWriterThreshold = 1; + } + + // Only validate that the resumeWriterThreshold is not too large if the writer could actually pause. + if (resumeWriterThreshold < 0 || (pauseWriterThreshold > 0 && resumeWriterThreshold > pauseWriterThreshold)) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); } diff --git a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs index 0ed95338a2413c..00432d8ddcc5d6 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs @@ -47,7 +47,7 @@ public override void Schedule(Action action, object state) public void CompletingReaderFromWriterCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Writer.OnReaderCompleted((exception, state) => { pipe.Writer.Complete(); }, null); @@ -61,7 +61,7 @@ public void CompletingReaderFromWriterCallbackWorks() public void CompletingWriterFromReaderCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Reader.OnWriterCompleted((exception, state) => { pipe.Reader.Complete(); }, null); @@ -201,7 +201,7 @@ public void OnReaderCompletedRanBeforeFlushContinuation() { var callbackRan = false; var continuationRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Writer.OnReaderCompleted( (exception, state) => diff --git a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs index b6628bc535fd61..d8621f4cc72cd1 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs @@ -33,6 +33,7 @@ public void InvalidArgs_Throws() AssertExtensions.Throws("pauseWriterThreshold", () => new PipeOptions(pauseWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(resumeWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 50, resumeWriterThreshold: 100)); + AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: -1)); } [Theory]