From f76cf5b9ddc42f0a60013a1d69380308e78533ec Mon Sep 17 00:00:00 2001 From: Connor Bullard <32420624+cdbullard@users.noreply.github.com> Date: Sun, 23 Oct 2022 16:05:58 -0400 Subject: [PATCH 1/6] Initial commit --- .../src/System/IO/Pipelines/PipeOptions.cs | 8 +++++++- .../System.IO.Pipelines/tests/PipeOptionsTests.cs | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index 998845f804dd33..401c6814b46341 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -63,11 +63,17 @@ public PipeOptions( { resumeWriterThreshold = DefaultResumeWriterThreshold; } - else if (resumeWriterThreshold < 0 || resumeWriterThreshold > pauseWriterThreshold) + else if (resumeWriterThreshold <= 0 || resumeWriterThreshold > pauseWriterThreshold) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); } + // If resumeWriterThreshold is still larger than pauseWriterThreshold, need to throw exception to prevent system hanging + if (resumeWriterThreshold > pauseWriterThreshold) + { + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); + } + Pool = pool ?? MemoryPool.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool.Shared; ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool; diff --git a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs index b6628bc535fd61..32a970c9fc660e 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs @@ -33,6 +33,8 @@ public void InvalidArgs_Throws() AssertExtensions.Throws("pauseWriterThreshold", () => new PipeOptions(pauseWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(resumeWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 50, resumeWriterThreshold: 100)); + AssertExtensions.Throws("pauseWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: -1)); + AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: 0)); } [Theory] From 0be73d824682ce143f2fc47f843ef764ced786da Mon Sep 17 00:00:00 2001 From: Connor Bullard <32420624+cdbullard@users.noreply.github.com> Date: Mon, 24 Oct 2022 00:44:12 -0400 Subject: [PATCH 2/6] Updating unit tests and more concise logic in PipeOptions --- .../src/System/IO/Pipelines/Pipe.cs | 3 ++- .../src/System/IO/Pipelines/PipeOptions.cs | 15 +++++---------- .../tests/PipeCompletionCallbacksTests.cs | 6 +++--- .../System.IO.Pipelines/tests/PipeOptionsTests.cs | 3 +-- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 85dbc333ebe9d8..906dd15e271879 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -497,7 +497,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); if (oldLength >= ResumeWriterThreshold && - _unconsumedBytes < ResumeWriterThreshold) + (_unconsumedBytes < ResumeWriterThreshold || + (_unconsumedBytes == 0 && ResumeWriterThreshold == 0))) { _writerAwaitable.Complete(out completionData); } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index 401c6814b46341..b959fabd4e09ca 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -54,25 +54,20 @@ public PipeOptions( { pauseWriterThreshold = DefaultPauseWriterThreshold; } - else if (pauseWriterThreshold < 0) - { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); - } if (resumeWriterThreshold == -1) { resumeWriterThreshold = DefaultResumeWriterThreshold; } - else if (resumeWriterThreshold <= 0 || resumeWriterThreshold > pauseWriterThreshold) - { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); - } - // If resumeWriterThreshold is still larger than pauseWriterThreshold, need to throw exception to prevent system hanging - if (resumeWriterThreshold > pauseWriterThreshold) + if (pauseWriterThreshold < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); } + else if (resumeWriterThreshold < 0 || resumeWriterThreshold > pauseWriterThreshold) + { + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); + } Pool = pool ?? MemoryPool.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool.Shared; diff --git a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs index 0ed95338a2413c..00432d8ddcc5d6 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeCompletionCallbacksTests.cs @@ -47,7 +47,7 @@ public override void Schedule(Action action, object state) public void CompletingReaderFromWriterCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Writer.OnReaderCompleted((exception, state) => { pipe.Writer.Complete(); }, null); @@ -61,7 +61,7 @@ public void CompletingReaderFromWriterCallbackWorks() public void CompletingWriterFromReaderCallbackWorks() { var callbackRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Reader.OnWriterCompleted((exception, state) => { pipe.Reader.Complete(); }, null); @@ -201,7 +201,7 @@ public void OnReaderCompletedRanBeforeFlushContinuation() { var callbackRan = false; var continuationRan = false; - var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); + var pipe = new Pipe(new PipeOptions(_pool, pauseWriterThreshold: 5, resumeWriterThreshold: 5, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false)); pipe.Writer.OnReaderCompleted( (exception, state) => diff --git a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs index 32a970c9fc660e..d8621f4cc72cd1 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeOptionsTests.cs @@ -33,8 +33,7 @@ public void InvalidArgs_Throws() AssertExtensions.Throws("pauseWriterThreshold", () => new PipeOptions(pauseWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(resumeWriterThreshold: -2)); AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 50, resumeWriterThreshold: 100)); - AssertExtensions.Throws("pauseWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: -1)); - AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: 0)); + AssertExtensions.Throws("resumeWriterThreshold", () => new PipeOptions(pauseWriterThreshold: 1, resumeWriterThreshold: -1)); } [Theory] From e17d9f563bb94c6bfa6bbaff17e144937a7151f2 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Tue, 6 Dec 2022 13:43:41 -0800 Subject: [PATCH 3/6] Apply suggestions from code review --- .../System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 906dd15e271879..0a95bda9c52ab2 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -496,9 +496,9 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); + // ResumeWriterThreshold is always at least 1 if (oldLength >= ResumeWriterThreshold && - (_unconsumedBytes < ResumeWriterThreshold || - (_unconsumedBytes == 0 && ResumeWriterThreshold == 0))) + _unconsumedBytes < ResumeWriterThreshold) { _writerAwaitable.Complete(out completionData); } From 332c8f699d9b70e81ccf2a2a48d2edc02f27fe2b Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Tue, 6 Dec 2022 14:37:17 -0800 Subject: [PATCH 4/6] Update src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs --- .../System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 0a95bda9c52ab2..85341c4f55aa51 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -496,7 +496,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); - // ResumeWriterThreshold is always at least 1 + Debug.Assert(ResumeWriterThreshold >= 1); + if (oldLength >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) { From a751233d492fb3a74db6343a7ea4c7dc7a9e0c66 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Tue, 6 Dec 2022 14:38:09 -0800 Subject: [PATCH 5/6] Update src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs --- .../System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 85341c4f55aa51..ed4f40b6cedbad 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -495,8 +495,7 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); - - Debug.Assert(ResumeWriterThreshold >= 1); + Debug.Assert(ResumeWriterThreshold >= 1, "ResumeWriterThreshold is less than 1"); if (oldLength >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) From 2cd9ae0e90de9cd4dc6b054a03adc48d8bc69480 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 1 Nov 2023 17:20:19 -0700 Subject: [PATCH 6/6] Apply suggestions from code review --- .../src/System/IO/Pipelines/PipeOptions.cs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index b959fabd4e09ca..a196f6029e97a7 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -54,17 +54,24 @@ public PipeOptions( { pauseWriterThreshold = DefaultPauseWriterThreshold; } + else if (pauseWriterThreshold < 0) + { + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); + } if (resumeWriterThreshold == -1) { resumeWriterThreshold = DefaultResumeWriterThreshold; } - - if (pauseWriterThreshold < 0) + else if (resumeWriterThreshold == 0) { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); + // A resumeWriterThreshold of 0 makes no sense because the writer could never resume if paused. + // By setting it to 1, the writer will resume only after all data is consumed. + resumeWriterThreshold = 1; } - else if (resumeWriterThreshold < 0 || resumeWriterThreshold > pauseWriterThreshold) + + // Only validate that the resumeWriterThreshold is not too large if the writer could actually pause. + if (resumeWriterThreshold < 0 || (pauseWriterThreshold > 0 && resumeWriterThreshold > pauseWriterThreshold)) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); }