From c6206f86e4edd9e19d7620eb9797dbc96cea47ef Mon Sep 17 00:00:00 2001 From: Yuri Golobokov Date: Thu, 23 Apr 2026 23:59:37 +0000 Subject: [PATCH] feat: recycle channel on consecutive new stream failures --- .../internal/channels/ChannelPoolDpImpl.java | 14 ++- .../channels/ChannelPoolDpImplTest.java | 94 +++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java index 9ef2a486f1..e4d11eb60a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java @@ -65,6 +65,8 @@ public class ChannelPoolDpImpl implements ChannelPool { private static final String DEFAULT_LOG_NAME = "pool"; private static final AtomicInteger INDEX = new AtomicInteger(); + private static final int CONSECUTIVE_OPEN_FAILURE_THRESHOLD = 5; + private final String poolLogId; @VisibleForTesting volatile int minGroups; @@ -221,6 +223,7 @@ public void start(Listener responseListener, Metadata headers) { public void onBeforeSessionStart(PeerInfo peerInfo) { afeId = AfeId.extract(peerInfo); synchronized (ChannelPoolDpImpl.this) { + channelWrapper.consecutiveFailures = 0; rehomeChannel(channelWrapper, afeId); sessionsPerAfeId.add(afeId); } @@ -232,6 +235,8 @@ public void onClose(Status status, Metadata trailers) { synchronized (ChannelPoolDpImpl.this) { if (afeId != null) { sessionsPerAfeId.remove(afeId); + } else if (!status.isOk() && status.getCode() != Code.CANCELLED) { + channelWrapper.consecutiveFailures++; } releaseChannel(channelWrapper, status); } @@ -306,12 +311,12 @@ private void releaseChannel(ChannelWrapper channelWrapper, Status status) { channelWrapper.group.numStreams--; channelWrapper.numOutstanding--; - if (shouldRecycleChannel(status)) { + if (shouldRecycleChannel(channelWrapper, status)) { recycleChannel(channelWrapper); } } - private static boolean shouldRecycleChannel(Status status) { + private static boolean shouldRecycleChannel(ChannelWrapper channelWrapper, Status status) { if (status.getCode() == Code.UNIMPLEMENTED) { return true; } @@ -322,6 +327,10 @@ private static boolean shouldRecycleChannel(Status status) { return true; } + if (channelWrapper.consecutiveFailures >= CONSECUTIVE_OPEN_FAILURE_THRESHOLD) { + return true; + } + return false; } @@ -480,6 +489,7 @@ static class ChannelWrapper { private final ManagedChannel channel; private final Instant createdAt; private int numOutstanding = 0; + private int consecutiveFailures = 0; public ChannelWrapper(AfeChannelGroup group, ManagedChannel channel, Clock clock) { this.group = group; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java index b5e2de38fa..9bb5ad1ece 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java @@ -469,4 +469,98 @@ void testRecycledChannelDoesNotRejoinPool() throws InterruptedException { pool.close(); } + + @Test + void testRecycleChannelOnConsecutiveFailures() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + + // Should not be recycled yet + verify(channel, times(0)).shutdown(); + verify(channelSupplier, times(1)).get(); + } + + // 5th failure + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + + // Now it should be recycled + verify(channel, times(1)).shutdown(); + verify(channelSupplier, times(2)).get(); + + pool.close(); + } + + @Test + void testResetConsecutiveFailuresOnSuccess() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + doReturn(Attributes.EMPTY).when(clientCall).getAttributes(); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // 4 failures + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + } + verify(channel, times(0)).shutdown(); + + // A success: onHeaders (which calls onBeforeSessionStart) + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + + PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build(); + Metadata headers = new Metadata(); + headers.put( + SessionStreamImpl.PEER_INFO_KEY, + Base64.getEncoder().encodeToString(peerInfo.toByteArray())); + listener.getValue().onHeaders(headers); + listener.getValue().onClose(Status.OK, new Metadata()); + + // Another 4 failures - should still not recycle because counter was reset + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + } + verify(channel, times(0)).shutdown(); + + pool.close(); + } + + @Test + void testCancelledDoesNotIncrementFailures() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + for (int i = 0; i < 10; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.CANCELLED, new Metadata()); + } + + // Should never be recycled + verify(channel, times(0)).shutdown(); + verify(channelSupplier, times(1)).get(); + + pool.close(); + } }