diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index deb7c284626a..c65f46ff8096 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel { // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand // or shrink). Used to detect repeated resizing activity and log a warning. - // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. - private int consecutiveResizes = 0; + private final AtomicInteger consecutiveResizes = new AtomicInteger(0); static ChannelPool create( ChannelPoolSettings settings, @@ -304,61 +303,82 @@ void resize() { localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum(); // Number of channels if each channel operated at max capacity - int minChannels = + int calculatedResizeMinChannels = (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMaxRpcsPerChannel()); // Limit the threshold to absolute range - if (minChannels < settings.getMinChannelCount()) { - minChannels = settings.getMinChannelCount(); + if (calculatedResizeMinChannels < settings.getMinChannelCount()) { + calculatedResizeMinChannels = settings.getMinChannelCount(); + } + // Limit in case the calculated min channel count exceeds the configured max channel count + if (calculatedResizeMinChannels > settings.getMaxChannelCount()) { + calculatedResizeMinChannels = settings.getMaxChannelCount(); } // Number of channels if each channel operated at minimum capacity // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem. - int maxChannels = + int calculatedResizeMaxChannels = (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMinRpcsPerChannel()); // Limit the threshold to absolute range - if (maxChannels > settings.getMaxChannelCount()) { - maxChannels = settings.getMaxChannelCount(); + if (calculatedResizeMaxChannels > settings.getMaxChannelCount()) { + calculatedResizeMaxChannels = settings.getMaxChannelCount(); } - if (maxChannels < minChannels) { - maxChannels = minChannels; + // Limit in case the calculated max channel count falls below the configured min channel count + if (calculatedResizeMaxChannels < settings.getMinChannelCount()) { + calculatedResizeMaxChannels = settings.getMinChannelCount(); } - // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of - // change. - int tentativeTarget = (maxChannels + minChannels) / 2; + // If the pool were to be resized, try to aim for the middle of the bound. The tentativeTarget + // is guaranteed to be between configured min and max channel bounds + int tentativeTarget = + calculatedResizeMinChannels + + (calculatedResizeMaxChannels - calculatedResizeMinChannels) / 2; int currentSize = localEntries.size(); + + // Calculate the desired change in pool size. int delta = tentativeTarget - currentSize; + + // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. + // Ensure that the step size is capped by the max channel count. + // Note: resize delta value is not enforced to be smaller than max channel count in + // ChannelPoolSettings as DEFAULT_RESIZE_DELTA is 2 and max channel pool count can be 1 int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > settings.getMaxResizeDelta()) { - dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); + int effectiveMaxResizeDelta = + Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount()); + + // Rate-limit the change to not exceed the effectiveMaxResizeDelta + if (Math.abs(delta) > effectiveMaxResizeDelta) { + // Maintaining the correct direction (positive or negative) to handle expand/shrink + int step = delta > 0 ? effectiveMaxResizeDelta : -effectiveMaxResizeDelta; + dampenedTarget = currentSize + step; } // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking // that `dampenedTarget != currentSize` would cause false positives when the pool is within // bounds but not at the target (target aims for the middle of the bounds) - boolean resized = (currentSize < minChannels || currentSize > maxChannels); + boolean resized = + (currentSize < calculatedResizeMinChannels || currentSize > calculatedResizeMaxChannels); if (resized) { - consecutiveResizes++; + consecutiveResizes.incrementAndGet(); } else { - consecutiveResizes = 0; + consecutiveResizes.set(0); } // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). // However, aim to log once to ensure that this does not incur log spam. - if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) { LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } // Only resize the pool when thresholds are crossed - if (localEntries.size() < minChannels) { + if (localEntries.size() < calculatedResizeMinChannels) { LOG.fine( String.format( "Detected throughput peak of %d, expanding channel pool size: %d -> %d.", actualOutstandingRpcs, currentSize, dampenedTarget)); expand(dampenedTarget); - } else if (localEntries.size() > maxChannels) { + } else if (localEntries.size() > calculatedResizeMaxChannels) { LOG.fine( String.format( "Detected throughput drop to %d, shrinking channel pool size: %d -> %d.", diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index 35feb89772c7..559e1197bc59 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) - // Static pools don't resize so this value doesn't affect operation. However, - // validation still checks that resize delta doesn't exceed channel pool size. - .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) + // Static pools don't resize so this value doesn't affect operation. + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA) .build(); } @@ -167,21 +166,57 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + /** + * Sets the minimum desired number of concurrent RPCs per channel. + * + *

This ensures channels are adequately utilized. If the average load per channel falls below + * this value, the pool attempts to shrink. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMinRpcsPerChannel(int count); + /** + * Sets the maximum desired number of concurrent RPCs per channel. + * + *

This ensures channels do not become overloaded. If the average load per channel exceeds + * this value, the pool attempts to expand. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMaxRpcsPerChannel(int count); + /** + * Sets the minimum number of channels the pool can shrink to. + * + *

When resizing, if the calculated resize bounds fall below this minimum configuration, the + * bounds will be clamped to this value. This ensures the pool never shrinks below this absolute + * minimum, even under very low load. + */ public abstract Builder setMinChannelCount(int count); + /** + * Sets the maximum number of channels the pool can expand to. + * + *

When resizing, if the calculated resize bounds exceed this maximum configuration, the + * bounds will be clamped to this value. This ensures the pool never expands above this absolute + * maximum, even under very high load. + */ public abstract Builder setMaxChannelCount(int count); + /** Sets the initial number of channels in the pool. */ public abstract Builder setInitialChannelCount(int count); + /** + * Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle. + */ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); /** * Sets the maximum number of channels that can be added or removed in a single resize cycle. - * This acts as a rate limiter to prevent wild fluctuations. + * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically + * according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is + * effectively capped by the bound configured via {@link #setMaxChannelCount}. * *

Warning: Higher values for resize delta may still result in performance degradation * during spikes due to rapid scaling. @@ -198,7 +233,7 @@ public ChannelPoolSettings build() { Preconditions.checkState( s.getMinChannelCount() > 0, "Minimum channel count must be at least 1"); Preconditions.checkState( - s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid"); + s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid"); Preconditions.checkState( s.getMinChannelCount() <= s.getInitialChannelCount(), "initial channel count be at least minChannelCount"); @@ -212,9 +247,6 @@ public ChannelPoolSettings build() { Preconditions.checkState( s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA, "Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA); - Preconditions.checkState( - s.getMaxResizeDelta() <= s.getMaxChannelCount(), - "Max resize delta cannot be greater than max channel count"); return s; } } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index d1d396a75b7b..6337c85b55fe 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -145,6 +145,26 @@ private void verifyTargetChannel( } } + private static ChannelFactory createMockChannelFactory( + List channels, List> startedCalls) { + return () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) + .thenAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ClientCall clientCall = Mockito.mock(ClientCall.class); + if (startedCalls != null) { + startedCalls.add(clientCall); + } + return clientCall; + }); + + channels.add(channel); + return channel; + }; + } + @Test void ensureEvenDistribution() throws InterruptedException, IOException { int numChannels = 10; @@ -452,21 +472,7 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -531,20 +537,7 @@ void customResizeDeltaIsRespected() throws Exception { List channels = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, null); pool = new ChannelPool( @@ -576,20 +569,8 @@ void removedIdleChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + List> startedCalls = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -616,21 +597,7 @@ void removedActiveChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -728,18 +695,7 @@ void repeatedResizingLogsWarningOnExpand() throws Exception { ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); FixedExecutorProvider provider = FixedExecutorProvider.create(executor); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - return clientCall; - }); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(new ArrayList<>(), null); pool = new ChannelPool( @@ -891,4 +847,68 @@ void settingsValidationFailsWhenMaxResizeDeltaExceedsLimit() { ChannelPoolSettings.builder().setMaxResizeDelta(26).setMaxChannelCount(30); assertThrows(IllegalStateException.class, builder::build); } + + @Test + void minChannelsClampedToMaxChannelCountUnderHighLoad() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, null); + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(1) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(10) + .setMinChannelCount(1) + .setMaxChannelCount(5) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(1); + + // Add 20 RPCs, which would require 10 channels (20/2) + // But max is 5 + for (int i = 0; i < 20; i++) { + ClientCalls.futureUnaryCall( + pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance()); + } + + pool.resize(); + + // Should be clamped to maxChannelCount = 5 + assertThat(pool.entries.get()).hasSize(5); + } + + @Test + void maxChannelsClampedToMinChannelCountUnderLowLoad() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, null); + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(5) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMinChannelCount(3) + .setMaxChannelCount(10) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(5); + + // With no outstanding RPCs, the pool should want to shrink to 0 + // But min is 3 + pool.resize(); + + // Should be clamped to minChannelCount = 3 + assertThat(pool.entries.get()).hasSize(3); + } }