diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index deb7c284626a..c65f46ff8096 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel { // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand // or shrink). Used to detect repeated resizing activity and log a warning. - // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. - private int consecutiveResizes = 0; + private final AtomicInteger consecutiveResizes = new AtomicInteger(0); static ChannelPool create( ChannelPoolSettings settings, @@ -304,61 +303,82 @@ void resize() { localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum(); // Number of channels if each channel operated at max capacity - int minChannels = + int calculatedResizeMinChannels = (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMaxRpcsPerChannel()); // Limit the threshold to absolute range - if (minChannels < settings.getMinChannelCount()) { - minChannels = settings.getMinChannelCount(); + if (calculatedResizeMinChannels < settings.getMinChannelCount()) { + calculatedResizeMinChannels = settings.getMinChannelCount(); + } + // Limit in case the calculated min channel count exceeds the configured max channel count + if (calculatedResizeMinChannels > settings.getMaxChannelCount()) { + calculatedResizeMinChannels = settings.getMaxChannelCount(); } // Number of channels if each channel operated at minimum capacity // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem. - int maxChannels = + int calculatedResizeMaxChannels = (int) Math.ceil(actualOutstandingRpcs / (double) settings.getMinRpcsPerChannel()); // Limit the threshold to absolute range - if (maxChannels > settings.getMaxChannelCount()) { - maxChannels = settings.getMaxChannelCount(); + if (calculatedResizeMaxChannels > settings.getMaxChannelCount()) { + calculatedResizeMaxChannels = settings.getMaxChannelCount(); } - if (maxChannels < minChannels) { - maxChannels = minChannels; + // Limit in case the calculated max channel count falls below the configured min channel count + if (calculatedResizeMaxChannels < settings.getMinChannelCount()) { + calculatedResizeMaxChannels = settings.getMinChannelCount(); } - // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of - // change. - int tentativeTarget = (maxChannels + minChannels) / 2; + // If the pool were to be resized, try to aim for the middle of the bound. The tentativeTarget + // is guaranteed to be between configured min and max channel bounds + int tentativeTarget = + calculatedResizeMinChannels + + (calculatedResizeMaxChannels - calculatedResizeMinChannels) / 2; int currentSize = localEntries.size(); + + // Calculate the desired change in pool size. int delta = tentativeTarget - currentSize; + + // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. + // Ensure that the step size is capped by the max channel count. + // Note: resize delta value is not enforced to be smaller than max channel count in + // ChannelPoolSettings as DEFAULT_RESIZE_DELTA is 2 and max channel pool count can be 1 int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > settings.getMaxResizeDelta()) { - dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); + int effectiveMaxResizeDelta = + Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount()); + + // Rate-limit the change to not exceed the effectiveMaxResizeDelta + if (Math.abs(delta) > effectiveMaxResizeDelta) { + // Maintaining the correct direction (positive or negative) to handle expand/shrink + int step = delta > 0 ? effectiveMaxResizeDelta : -effectiveMaxResizeDelta; + dampenedTarget = currentSize + step; } // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking // that `dampenedTarget != currentSize` would cause false positives when the pool is within // bounds but not at the target (target aims for the middle of the bounds) - boolean resized = (currentSize < minChannels || currentSize > maxChannels); + boolean resized = + (currentSize < calculatedResizeMinChannels || currentSize > calculatedResizeMaxChannels); if (resized) { - consecutiveResizes++; + consecutiveResizes.incrementAndGet(); } else { - consecutiveResizes = 0; + consecutiveResizes.set(0); } // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). // However, aim to log once to ensure that this does not incur log spam. - if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) { LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } // Only resize the pool when thresholds are crossed - if (localEntries.size() < minChannels) { + if (localEntries.size() < calculatedResizeMinChannels) { LOG.fine( String.format( "Detected throughput peak of %d, expanding channel pool size: %d -> %d.", actualOutstandingRpcs, currentSize, dampenedTarget)); expand(dampenedTarget); - } else if (localEntries.size() > maxChannels) { + } else if (localEntries.size() > calculatedResizeMaxChannels) { LOG.fine( String.format( "Detected throughput drop to %d, shrinking channel pool size: %d -> %d.", diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index 35feb89772c7..559e1197bc59 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) - // Static pools don't resize so this value doesn't affect operation. However, - // validation still checks that resize delta doesn't exceed channel pool size. - .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) + // Static pools don't resize so this value doesn't affect operation. + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA) .build(); } @@ -167,21 +166,57 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + /** + * Sets the minimum desired number of concurrent RPCs per channel. + * + *
This ensures channels are adequately utilized. If the average load per channel falls below + * this value, the pool attempts to shrink. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMinRpcsPerChannel(int count); + /** + * Sets the maximum desired number of concurrent RPCs per channel. + * + *
This ensures channels do not become overloaded. If the average load per channel exceeds + * this value, the pool attempts to expand. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMaxRpcsPerChannel(int count); + /** + * Sets the minimum number of channels the pool can shrink to. + * + *
When resizing, if the calculated resize bounds fall below this minimum configuration, the + * bounds will be clamped to this value. This ensures the pool never shrinks below this absolute + * minimum, even under very low load. + */ public abstract Builder setMinChannelCount(int count); + /** + * Sets the maximum number of channels the pool can expand to. + * + *
When resizing, if the calculated resize bounds exceed this maximum configuration, the + * bounds will be clamped to this value. This ensures the pool never expands above this absolute + * maximum, even under very high load. + */ public abstract Builder setMaxChannelCount(int count); + /** Sets the initial number of channels in the pool. */ public abstract Builder setInitialChannelCount(int count); + /** + * Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle. + */ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); /** * Sets the maximum number of channels that can be added or removed in a single resize cycle. - * This acts as a rate limiter to prevent wild fluctuations. + * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically + * according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is + * effectively capped by the bound configured via {@link #setMaxChannelCount}. * *
Warning: Higher values for resize delta may still result in performance degradation
* during spikes due to rapid scaling.
@@ -198,7 +233,7 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMinChannelCount() > 0, "Minimum channel count must be at least 1");
Preconditions.checkState(
- s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid");
+ s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid");
Preconditions.checkState(
s.getMinChannelCount() <= s.getInitialChannelCount(),
"initial channel count be at least minChannelCount");
@@ -212,9 +247,6 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA,
"Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA);
- Preconditions.checkState(
- s.getMaxResizeDelta() <= s.getMaxChannelCount(),
- "Max resize delta cannot be greater than max channel count");
return s;
}
}
diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
index d1d396a75b7b..6337c85b55fe 100644
--- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
+++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
@@ -145,6 +145,26 @@ private void verifyTargetChannel(
}
}
+ private static ChannelFactory createMockChannelFactory(
+ List