From be81407f724f778bc6706a3555f9dc408da05f6d Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Tue, 21 Apr 2026 10:54:48 +0000 Subject: [PATCH 1/3] feat: Add buffered and resumed writer connection logic --- .../async/writer_connection_buffered.cc | 14 +- .../async/writer_connection_buffered_test.cc | 61 +++++++++ .../async/writer_connection_resumed.cc | 14 +- .../async/writer_connection_resumed_test.cc | 123 ++++++++++++++++++ 4 files changed, 206 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index 786a33b3b89cd..a67810fbda0c6 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -281,7 +281,8 @@ class AsyncWriterConnectionBufferedState return tmp; } - void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { + void OnQuery(std::unique_lock lk, std::int64_t persisted_size, + bool is_resume = false) { if (persisted_size < buffer_offset_) { auto id = UploadId(lk); return SetError(std::move(lk), @@ -297,7 +298,14 @@ class AsyncWriterConnectionBufferedState } resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; - write_offset_ -= static_cast(n); + if (is_resume) { + // Since the buffer has been modified to start exactly at the point of the + // resume, the next write on this new stream should start from the + // beginning of this truncated buffer. + write_offset_ = 0; + } else { + write_offset_ -= static_cast(n); + } // If the buffer is small enough, collect all the handlers to notify them. auto const handlers = ClearHandlersIfEmpty(lk); // SetFlushed will release the lock before returning. @@ -382,7 +390,7 @@ class AsyncWriterConnectionBufferedState std::move(state))); } // Regular resume succeeded, object not finalized. Continue writing. - OnQuery(std::move(lk), absl::get(state)); + OnQuery(std::move(lk), absl::get(state), /*is_resume=*/true); } void SetFinalized(std::unique_lock lk, diff --git a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc index 40758de3c17e6..ed01b4581a3f0 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered_test.cc @@ -1266,6 +1266,67 @@ TEST(WriteConnectionBuffered, SetFinalizedIsIdempotent) { next.first.set_value(true); } +TEST(WriteConnectionBuffered, ResetWriteOffsetOnResume) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + EXPECT_CALL(*mock_ptr, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce( + Return(MakePersistedState(0))); // Initial state: 0 bytes persisted. + + EXPECT_CALL(*mock_ptr, Write).WillOnce([&](auto) { + return sequencer.PushBack("Write").then([](auto f) { + if (!f.get()) return TransientError(); // This write will fail. + return Status{}; + }); + }); + + MockFactory mock_factory; + auto resumed_mock = std::make_unique(); + auto* resumed_mock_ptr = resumed_mock.get(); + + EXPECT_CALL(mock_factory, Call).WillOnce([&]() { + return sequencer.PushBack("Resume").then([&](auto) { + // The resumed connection reports that 1024 bytes have been persisted. + EXPECT_CALL(*resumed_mock_ptr, PersistedState) + .WillRepeatedly(Return(MakePersistedState(1024))); + // We expect the next write on the resumed stream to send the remaining + // 1024 bytes. If the write offset was not reset to 0, this size would be + // incorrect. + EXPECT_CALL(*resumed_mock_ptr, Write).WillOnce([&](auto payload) { + EXPECT_EQ(payload.size(), 1024); + return sequencer.PushBack("ResumedWrite").then([](auto) { + return Status{}; + }); + }); + return make_status_or(std::unique_ptr( + std::move(resumed_mock))); + }); + }); + + auto connection = MakeWriterConnectionBuffered( + mock_factory.AsStdFunction(), std::move(mock), TestOptions()); + + // Write a total of 2048 bytes. + auto write = connection->Write(TestPayload(2048)); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Resume"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "ResumedWrite"); + next.first.set_value(true); + + EXPECT_STATUS_OK(write.get()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 3b860bef02a6e..a16d7c8ef2c70 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -298,7 +298,8 @@ class AsyncWriterConnectionResumedState return tmp; } - void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { + void OnQuery(std::unique_lock lk, std::int64_t persisted_size, + bool is_resume = false) { auto handle = impl_->WriteHandle(); if (handle) { latest_write_handle_ = *std::move(handle); @@ -317,7 +318,14 @@ class AsyncWriterConnectionResumedState } resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; - write_offset_ -= static_cast(n); + if (is_resume) { + // Since the buffer has been modified to start exactly at the point of the + // resume, the next write on this new stream should start from the + // beginning of this truncated buffer. + write_offset_ = 0; + } else { + write_offset_ -= static_cast(n); + } // If the buffer is small enough, collect all the handlers to notify them. auto const handlers = ClearHandlersIfEmpty(lk); state_ = State::kIdle; @@ -436,7 +444,7 @@ class AsyncWriterConnectionResumedState options_, initial_request_, std::move(res->stream), hash_function_, persisted_offset, false); // OnQuery will restart the WriteLoop if necessary. - OnQuery(std::move(lk), persisted_offset); + OnQuery(std::move(lk), persisted_offset, /*is_resume=*/true); } void SetFinalized(std::unique_lock lk, diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index c274b683c4287..839b5fdfa6507 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/internal/async/writer_connection_resumed.h" +#include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h" #include "google/cloud/storage/async/connection.h" #include "google/cloud/storage/mocks/mock_async_writer_connection.h" #include "google/cloud/storage/testing/canonical_errors.h" @@ -615,6 +616,128 @@ TEST(WriterConnectionResumed, OnQueryUpdatesWriteHandle) { EXPECT_EQ(current_handle->handle(), "updated-handle"); } +TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto* mock_ptr = mock.get(); + + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + google::storage::v2::BidiWriteObjectResponse first_response; + first_response.mutable_write_handle()->set_handle("initial-handle"); + + auto mock_hash = + std::make_shared(); + EXPECT_CALL(*mock_hash, Update(::testing::An(), + ::testing::An(), + ::testing::An())) + .WillRepeatedly(Return(Status())); + + EXPECT_CALL(*mock_ptr, PersistedState) + .WillOnce( + Return(MakePersistedState(0))) // Initial state: 0 bytes persisted. + .WillOnce(Return( + MakePersistedState(1024))); // Resumed state: 1024 bytes persisted. + + EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (f.get()) return Status{}; + return TransientError(); // Return a transient error to trigger resume. + }); + }); + + MockFactory mock_factory; + auto mock_stream = + std::make_unique>(); + auto* mock_stream_ptr = mock_stream.get(); + + // The mock factory is called when the connection resumes. + EXPECT_CALL(mock_factory, Call(_)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest) { + WriteObject::WriteResult result; + result.stream = std::move(mock_stream); + result.first_response.mutable_write_handle()->set_handle("new-handle"); + return sequencer.PushBack("Factory").then( + [r = std::move(result)](auto) mutable { + return StatusOr(std::move(r)); + }); + }); + + // After resuming, the connection should write the remaining payload. + EXPECT_CALL(*mock_stream_ptr, Write(_, _)) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + // We expect the next write on the resumed stream to send the remaining + // 1024 bytes. If the write offset was not reset to 0, this size would + // be incorrect. + EXPECT_EQ(request.checksummed_data().content().size(), 1024); + return sequencer.PushBack("StreamWrite").then([](auto) { + return true; + }); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions) { + // Expect a final "ghost" write to flush. + EXPECT_TRUE(request.checksummed_data().content().empty()); + EXPECT_TRUE(request.flush()); + return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); + }); + + google::storage::v2::BidiWriteObjectResponse read_response1; + read_response1.set_persisted_size(2048); + google::storage::v2::BidiWriteObjectResponse read_response2; + read_response2.set_persisted_size(2048); + EXPECT_CALL(*mock_stream_ptr, Read) + .WillOnce([&, read_response1]() { + return sequencer.PushBack("StreamRead1").then([read_response1](auto) { + return absl::make_optional(read_response1); + }); + }) + .WillOnce([&, read_response2]() { + return sequencer.PushBack("StreamRead2").then([read_response2](auto) { + return absl::make_optional(read_response2); + }); + }); + + EXPECT_CALL(*mock_stream_ptr, Finish) + .WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return()); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, + first_response, Options{}); + + // Write a total of 2048 bytes. + auto write = connection->Write(TestPayload(2048)); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(false); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Factory"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead1"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "GhostWrite"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "StreamRead2"); + next.first.set_value(true); + + EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From 244a1e1d23b785258c5389b1213026f0f1bf1a14 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Wed, 22 Apr 2026 06:24:27 +0000 Subject: [PATCH 2/3] fix: Address issues from CI checks --- .../async/writer_connection_buffered.cc | 10 ++++++- .../async/writer_connection_resumed.cc | 10 ++++++- .../async/writer_connection_resumed_test.cc | 29 ++++++++----------- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_buffered.cc b/google/cloud/storage/internal/async/writer_connection_buffered.cc index a67810fbda0c6..ec179e207b516 100644 --- a/google/cloud/storage/internal/async/writer_connection_buffered.cc +++ b/google/cloud/storage/internal/async/writer_connection_buffered.cc @@ -304,7 +304,15 @@ class AsyncWriterConnectionBufferedState // beginning of this truncated buffer. write_offset_ = 0; } else { - write_offset_ -= static_cast(n); + // While rare, it is possible that n >= write_offset_ (i.e. the server has + // persisted more than we have sent) if, for example, multiple clients + // resume the same upload. If that is the case, all the bytes covered by + // write_offset_ have been flushed and we can reset it to 0. + if (static_cast(n) >= write_offset_) { + write_offset_ = 0; + } else { + write_offset_ -= static_cast(n); + } } // If the buffer is small enough, collect all the handlers to notify them. auto const handlers = ClearHandlersIfEmpty(lk); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index a16d7c8ef2c70..4ca095b15139e 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -324,7 +324,15 @@ class AsyncWriterConnectionResumedState // beginning of this truncated buffer. write_offset_ = 0; } else { - write_offset_ -= static_cast(n); + // While rare, it is possible that n >= write_offset_ (i.e. the server has + // persisted more than we have sent) if, for example, multiple clients + // resume the same upload. If that is the case, all the bytes covered by + // write_offset_ have been flushed and we can reset it to 0. + if (static_cast(n) >= write_offset_) { + write_offset_ = 0; + } else { + write_offset_ -= static_cast(n); + } } // If the buffer is small enough, collect all the handlers to notify them. auto const handlers = ClearHandlersIfEmpty(lk); diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 839b5fdfa6507..dcebb80ac8c63 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -11,10 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include "google/cloud/storage/internal/async/writer_connection_resumed.h" #include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h" #include "google/cloud/storage/async/connection.h" +#include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h" #include "google/cloud/storage/mocks/mock_async_writer_connection.h" #include "google/cloud/storage/testing/canonical_errors.h" #include "google/cloud/storage/testing/mock_hash_function.h" @@ -633,15 +633,15 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { .WillRepeatedly(Return(Status())); EXPECT_CALL(*mock_ptr, PersistedState) - .WillOnce( - Return(MakePersistedState(0))) // Initial state: 0 bytes persisted. - .WillOnce(Return( - MakePersistedState(1024))); // Resumed state: 1024 bytes persisted. + .WillOnce(Return(MakePersistedState(0))) + .WillOnce(Return(MakePersistedState(1024))); + + auto const payload = TestPayload(2048); EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) { return sequencer.PushBack("Flush").then([](auto f) { if (f.get()) return Status{}; - return TransientError(); // Return a transient error to trigger resume. + return TransientError(); }); }); @@ -652,9 +652,8 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { google::storage::v2::BidiWriteObjectResponse>>(); auto* mock_stream_ptr = mock_stream.get(); - // The mock factory is called when the connection resumes. EXPECT_CALL(mock_factory, Call(_)) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest) { + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) { WriteObject::WriteResult result; result.stream = std::move(mock_stream); result.first_response.mutable_write_handle()->set_handle("new-handle"); @@ -664,22 +663,19 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { }); }); - // After resuming, the connection should write the remaining payload. EXPECT_CALL(*mock_stream_ptr, Write(_, _)) .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, grpc::WriteOptions) { - // We expect the next write on the resumed stream to send the remaining - // 1024 bytes. If the write offset was not reset to 0, this size would - // be incorrect. - EXPECT_EQ(request.checksummed_data().content().size(), 1024); + EXPECT_EQ(GetContent(request.checksummed_data()).size(), 1024); + EXPECT_EQ(GetContent(request.checksummed_data()), + std::string(1024, 'A')); return sequencer.PushBack("StreamWrite").then([](auto) { return true; }); }) .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, grpc::WriteOptions) { - // Expect a final "ghost" write to flush. - EXPECT_TRUE(request.checksummed_data().content().empty()); + EXPECT_TRUE(GetContent(request.checksummed_data()).empty()); EXPECT_TRUE(request.flush()); return sequencer.PushBack("GhostWrite").then([](auto) { return true; }); }); @@ -708,8 +704,7 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) { mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash, first_response, Options{}); - // Write a total of 2048 bytes. - auto write = connection->Write(TestPayload(2048)); + auto write = connection->Write(payload); auto next = sequencer.PopFrontWithName(); EXPECT_EQ(next.second, "Flush"); From d8085c3406ed662c03929af4d45dd62ed5b9f45f Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Fri, 24 Apr 2026 10:15:22 +0000 Subject: [PATCH 3/3] fix: Address reviewer feedback --- .../storage/internal/async/writer_connection_resumed.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 4ca095b15139e..5ae78d307bd59 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -298,8 +298,7 @@ class AsyncWriterConnectionResumedState return tmp; } - void OnQuery(std::unique_lock lk, std::int64_t persisted_size, - bool is_resume = false) { + void OnQuery(std::unique_lock lk, std::int64_t persisted_size) { auto handle = impl_->WriteHandle(); if (handle) { latest_write_handle_ = *std::move(handle); @@ -318,7 +317,7 @@ class AsyncWriterConnectionResumedState } resend_buffer_.RemovePrefix(static_cast(n)); buffer_offset_ = persisted_size; - if (is_resume) { + if (state_ == State::kResuming) { // Since the buffer has been modified to start exactly at the point of the // resume, the next write on this new stream should start from the // beginning of this truncated buffer. @@ -452,7 +451,7 @@ class AsyncWriterConnectionResumedState options_, initial_request_, std::move(res->stream), hash_function_, persisted_offset, false); // OnQuery will restart the WriteLoop if necessary. - OnQuery(std::move(lk), persisted_offset, /*is_resume=*/true); + OnQuery(std::move(lk), persisted_offset); } void SetFinalized(std::unique_lock lk,