Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
03e341c
feat(gax-grpc): add configurable resize delta and warning for repeate…
lqiu96 Apr 17, 2026
2060fcf
chore(gax-grpc): revert surefire plugin configuration in pom.xml
lqiu96 Apr 17, 2026
e876b31
chore(gax-grpc): add comments and remove magic numbers for resize delta
lqiu96 Apr 17, 2026
b3ffb64
docs(gax-grpc): explain resizing detection choice in comments
lqiu96 Apr 17, 2026
8a5daa1
chore(gax-grpc): replace magic number 5 with constant in ChannelPool
lqiu96 Apr 17, 2026
813e101
docs(gax-grpc): update javadoc for maxResizeDelta to explain burst ha…
lqiu96 Apr 17, 2026
91ffb52
chore: generate libraries at Fri Apr 17 21:02:58 UTC 2026
cloud-java-bot Apr 17, 2026
de28aef
docs(gax-grpc): reference MAX_RESIZE_DELTA constant in javadoc
lqiu96 Apr 17, 2026
827b22d
docs(gax-grpc): explain use of == for log threshold
lqiu96 Apr 17, 2026
899736f
feat(gax-grpc): optimize ChannelPool resize and add thread safety com…
lqiu96 Apr 17, 2026
f9792b6
style(gax-grpc): format ChannelPool.java
lqiu96 Apr 17, 2026
856f3f2
style(gax-grpc): use constant for warning message and simplify comments
lqiu96 Apr 17, 2026
a6a34dc
Merge branch 'main' into feat/channelpool-resizing
lqiu96 Apr 20, 2026
2be58da
Revert "chore: generate libraries at Fri Apr 17 21:02:58 UTC 2026"
lqiu96 Apr 20, 2026
e61486f
chore: Update ocmments and refactor
lqiu96 Apr 20, 2026
cbef704
feat(gax-grpc): remove maxResizeDelta validation and update javadoc
lqiu96 Apr 20, 2026
2895af5
style(gax-grpc): update javadoc in ChannelPoolSettings to be a warning
lqiu96 Apr 20, 2026
068887e
feat(gax-grpc): restore maxResizeDelta validation and javadoc
lqiu96 Apr 20, 2026
947fad5
docs(gax-grpc): add comment explaining resize delta clamping in stati…
lqiu96 Apr 20, 2026
f62aa0c
chore: Add a comment to explain the resize delta logic in static size
lqiu96 Apr 20, 2026
697c336
feat(gax-grpc): add cap of 25 to maxResizeDelta and test
lqiu96 Apr 20, 2026
387c68e
docs(gax-grpc): add warning about high resize delta values in javadoc
lqiu96 Apr 20, 2026
defe1bb
docs(gax-grpc): add warning about high resize delta values to setter …
lqiu96 Apr 20, 2026
e84b9d2
chore: Address sonar comments
lqiu96 Apr 21, 2026
f4ce3df
fix: Fix channel clamping logic in ChannelPools to respect channel bo…
lqiu96 Apr 21, 2026
f724364
Refine ChannelPool Javadoc and restore validation
lqiu96 Apr 21, 2026
0c8f65e
docs(gax-grpc): clarify runtime capping of maxResizeDelta and simplif…
lqiu96 Apr 21, 2026
1c205e3
docs(gax-grpc): clarify runtime capping of maxResizeDelta and simplif…
lqiu96 Apr 21, 2026
dd0a3fe
chore: Remove unneeded resize delta validation
lqiu96 Apr 21, 2026
ada273c
Merge branch 'main' into fix/channelpool-logic-fix
lqiu96 Apr 21, 2026
0cf1ba4
chore: Fix merge conflicts adding duplicated test
lqiu96 Apr 21, 2026
3c18a39
Merge branch 'main' into fix/channelpool-logic-fix
lqiu96 Apr 22, 2026
89271df
chore: Address PR comments
lqiu96 Apr 22, 2026
6f79c03
fix(gax-grpc): remove redundant clamping and add boundary tests
lqiu96 Apr 22, 2026
c671e66
chore: use clearer variable names
lqiu96 Apr 22, 2026
6654ce3
chore: Fix lint issues
lqiu96 Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel {

// Tracks the number of consecutive resize cycles where a resize actually occurred (either expand
// or shrink). Used to detect repeated resizing activity and log a warning.
// Note: This field is only accessed safely within resizeSafely() and does not need to be atomic.
private int consecutiveResizes = 0;
private final AtomicInteger consecutiveResizes = new AtomicInteger(0);

static ChannelPool create(
ChannelPoolSettings settings,
Expand Down Expand Up @@ -304,61 +303,82 @@ void resize() {
localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum();

// Number of channels if each channel operated at max capacity
int minChannels =
int calculatedResizeMinChannels =
(int) Math.ceil(actualOutstandingRpcs / (double) settings.getMaxRpcsPerChannel());
// Limit the threshold to absolute range
if (minChannels < settings.getMinChannelCount()) {
minChannels = settings.getMinChannelCount();
if (calculatedResizeMinChannels < settings.getMinChannelCount()) {
calculatedResizeMinChannels = settings.getMinChannelCount();
}
// Limit in case the calculated min channel count exceeds the configured max channel count
if (calculatedResizeMinChannels > settings.getMaxChannelCount()) {
calculatedResizeMinChannels = settings.getMaxChannelCount();
}

// Number of channels if each channel operated at minimum capacity
// Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem.
int maxChannels =
int calculatedResizeMaxChannels =
(int) Math.ceil(actualOutstandingRpcs / (double) settings.getMinRpcsPerChannel());
// Limit the threshold to absolute range
if (maxChannels > settings.getMaxChannelCount()) {
maxChannels = settings.getMaxChannelCount();
if (calculatedResizeMaxChannels > settings.getMaxChannelCount()) {
calculatedResizeMaxChannels = settings.getMaxChannelCount();
}
if (maxChannels < minChannels) {
maxChannels = minChannels;
// Limit in case the calculated max channel count falls below the configured min channel count
if (calculatedResizeMaxChannels < settings.getMinChannelCount()) {
calculatedResizeMaxChannels = settings.getMinChannelCount();
}

// If the pool were to be resized, try to aim for the middle of the bound, but limit rate of
// change.
int tentativeTarget = (maxChannels + minChannels) / 2;
// If the pool were to be resized, try to aim for the middle of the bound. The tentativeTarget
// is guaranteed to be between configured min and max channel bounds
int tentativeTarget =
calculatedResizeMinChannels
+ (calculatedResizeMaxChannels - calculatedResizeMinChannels) / 2;
int currentSize = localEntries.size();

// Calculate the desired change in pool size.
int delta = tentativeTarget - currentSize;

// Dampen the rate of change if the desired delta exceeds the maximum allowed step size.
// Ensure that the step size is capped by the max channel count.
// Note: resize delta value is not enforced to be smaller than max channel count in
// ChannelPoolSettings as DEFAULT_RESIZE_DELTA is 2 and max channel pool count can be 1
int dampenedTarget = tentativeTarget;
if (Math.abs(delta) > settings.getMaxResizeDelta()) {
dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta);
int effectiveMaxResizeDelta =
Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount());

// Rate-limit the change to not exceed the effectiveMaxResizeDelta
if (Math.abs(delta) > effectiveMaxResizeDelta) {
// Maintaining the correct direction (positive or negative) to handle expand/shrink
int step = delta > 0 ? effectiveMaxResizeDelta : -effectiveMaxResizeDelta;
dampenedTarget = currentSize + step;
}

// Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking
// that `dampenedTarget != currentSize` would cause false positives when the pool is within
// bounds but not at the target (target aims for the middle of the bounds)
boolean resized = (currentSize < minChannels || currentSize > maxChannels);
boolean resized =
(currentSize < calculatedResizeMinChannels || currentSize > calculatedResizeMaxChannels);
if (resized) {
consecutiveResizes++;
consecutiveResizes.incrementAndGet();
} else {
consecutiveResizes = 0;
consecutiveResizes.set(0);
}

// Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log
// message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit).
// However, aim to log once to ensure that this does not incur log spam.
if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) {
if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) {
LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
}

// Only resize the pool when thresholds are crossed
if (localEntries.size() < minChannels) {
if (localEntries.size() < calculatedResizeMinChannels) {
LOG.fine(
String.format(
"Detected throughput peak of %d, expanding channel pool size: %d -> %d.",
actualOutstandingRpcs, currentSize, dampenedTarget));

expand(dampenedTarget);
} else if (localEntries.size() > maxChannels) {
} else if (localEntries.size() > calculatedResizeMaxChannels) {
LOG.fine(
String.format(
"Detected throughput drop to %d, shrinking channel pool size: %d -> %d.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) {
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
.setMinChannelCount(size)
.setMaxChannelCount(size)
// Static pools don't resize so this value doesn't affect operation. However,
// validation still checks that resize delta doesn't exceed channel pool size.
.setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size))
// Static pools don't resize so this value doesn't affect operation.
.setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA)
.build();
}

Expand All @@ -167,21 +166,57 @@ public static Builder builder() {

@AutoValue.Builder
public abstract static class Builder {
/**
* Sets the minimum desired number of concurrent RPCs per channel.
*
* <p>This ensures channels are adequately utilized. If the average load per channel falls below
* this value, the pool attempts to shrink. The resulting target channel count is a dynamic
* value determined by load and is bounded by {@link #setMinChannelCount} and {@link
* #setMaxChannelCount}.
*/
public abstract Builder setMinRpcsPerChannel(int count);

/**
* Sets the maximum desired number of concurrent RPCs per channel.
*
* <p>This ensures channels do not become overloaded. If the average load per channel exceeds
* this value, the pool attempts to expand. The resulting target channel count is a dynamic
* value determined by load and is bounded by {@link #setMinChannelCount} and {@link
* #setMaxChannelCount}.
*/
public abstract Builder setMaxRpcsPerChannel(int count);

/**
* Sets the minimum number of channels the pool can shrink to.
*
* <p>When resizing, if the calculated resize bounds fall below this minimum configuration, the
* bounds will be clamped to this value. This ensures the pool never shrinks below this absolute
* minimum, even under very low load.
*/
public abstract Builder setMinChannelCount(int count);

/**
* Sets the maximum number of channels the pool can expand to.
*
* <p>When resizing, if the calculated resize bounds exceed this maximum configuration, the
* bounds will be clamped to this value. This ensures the pool never expands above this absolute
* maximum, even under very high load.
*/
public abstract Builder setMaxChannelCount(int count);

/** Sets the initial number of channels in the pool. */
public abstract Builder setInitialChannelCount(int count);

/**
* Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle.
*/
public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);

/**
* Sets the maximum number of channels that can be added or removed in a single resize cycle.
* This acts as a rate limiter to prevent wild fluctuations.
* This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically
* according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is
* effectively capped by the bound configured via {@link #setMaxChannelCount}.
*
* <p><b>Warning:</b> Higher values for resize delta may still result in performance degradation
* during spikes due to rapid scaling.
Expand All @@ -198,7 +233,7 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMinChannelCount() > 0, "Minimum channel count must be at least 1");
Preconditions.checkState(
s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid");
s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid");
Preconditions.checkState(
s.getMinChannelCount() <= s.getInitialChannelCount(),
"initial channel count be at least minChannelCount");
Expand All @@ -212,9 +247,6 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA,
"Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA);
Preconditions.checkState(
s.getMaxResizeDelta() <= s.getMaxChannelCount(),
"Max resize delta cannot be greater than max channel count");
return s;
}
}
Expand Down
Loading
Loading