[client] Fix race condition in RemoteLogDownloader causing flaky testPrefetchNum#3158
Conversation
…PrefetchNum The prefetch semaphore was acquired before polling the work queue in fetchOnce(). When the queue was empty the download thread held a permit for the full pollTimeout (10 ms) before releasing it, creating a window where availablePermits() transiently returned 1 instead of 2 immediately after two recycleRemoteLog() calls in testPrefetchNum. Fix: poll the queue first and only acquire the semaphore when there is actual work to do. The semaphore is never borrowed for null-polls, so availablePermits() accurately reflects true available capacity at all times. Also updates testDownloadLogInParallelAndInPriority: with poll-first semantics the download thread polls one extra item before blocking on the semaphore, so the queue size is totalSegments-5 or totalSegments-4 (not strictly totalSegments-4 as before). The assertion is relaxed to isBetween() to cover both cases. Fixes apache#3145
b671bb6 to
a82249a
Compare
|
tested the build in my fork https://github.com/ankit-khare-2015/fluss/actions/runs/24759894771/job/72441084138#logs |
|
Thanks, @zuston. Could you please merge these changes to resolve the issue? |
There was a problem hiding this comment.
Pull request overview
This PR addresses flaky behavior in RemoteLogDownloaderTest.testPrefetchNum by changing RemoteLogDownloader.fetchOnce() to only acquire the prefetch semaphore when there is actual work, avoiding temporarily “borrowing” permits during empty-queue polls.
Changes:
- Reordered
fetchOnce()to poll the request queue before acquiringprefetchSemaphore. - Relaxed a brittle queue-size assertion in
testDownloadLogInParallelAndInPriorityto accommodate the new polling behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java | Adjusts fetch ordering between queue poll and semaphore acquire to eliminate a timing window in tests. |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java | Updates assertion to tolerate the updated fetch/prefetch behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // blocks until there is capacity (the fetched file is consumed) | ||
| prefetchSemaphore.acquire(); | ||
|
|
There was a problem hiding this comment.
fetchOnce() now polls from the PriorityBlockingQueue before acquiring the prefetchSemaphore. If no permits are available, the thread can block on acquire() while holding a dequeued request, which (1) can violate the intended priority ordering if higher-priority requests are added while blocked, and (2) can drop the request on shutdown/interrupt (InterruptedException from acquire() exits the thread without re-adding). Consider not removing an element from the priority queue until after a permit is obtained (e.g., poll to detect work, reinsert it, acquire permit, then take/poll again), and ensure InterruptedException preserves/requeues the request.
| // blocks until there is capacity (the fetched file is consumed) | |
| prefetchSemaphore.acquire(); | |
| // Put the request back before waiting for capacity so it is not lost on interruption | |
| // and higher-priority requests can still be selected while this thread is blocked. | |
| segmentsToFetch.add(request); | |
| // blocks until there is capacity (the fetched file is consumed) | |
| prefetchSemaphore.acquire(); | |
| request = segmentsToFetch.poll(); | |
| if (request == null) { | |
| prefetchSemaphore.release(); | |
| return; | |
| } |
There was a problem hiding this comment.
I agree with Copilot. The acquire() before poll() is intended to guarantees the consuming order of RemoteLogSegments. The consuming order is essential (see #1421).
There was a problem hiding this comment.
Do you want me to revert the changes, assuming it was an intended behavior, and investigate further?
|
Hi @ankit-khare-2015, I think the root cause analysis provided by Claude Code is correct (pasted below). I suggest restricting the permits to 1 or 2 via an assertion and adding a comment before the assert line to explain the rationale. // the 2 semaphores are released, it should be 2 permits available. But the next
// fetchOnce() maybe called at this point of time, and acquire the permits,
// so we just check the permits is between 1 and 2.
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits())
.isBetween(1, 2);By AI Generated: Root Cause: Race Condition Between
|
Use isBetween(1, 2) instead of isEqualTo(2) with an explanatory comment, as fetchOnce() may concurrently acquire a permit after the recycle callbacks fire.
Summary
Fixes #3145 —
RemoteLogDownloaderTest.testPrefetchNumwas non-deterministically failing.Root cause: In
RemoteLogDownloader.fetchOnce(), the prefetch semaphore was acquired before polling the work queue:When the queue was empty the download thread held a permit for the full
pollTimeout(10 ms in tests) before releasing it. The test assertionavailablePermits() == 2ran during that 10 ms window and saw1instead of2, causing a spurious failure.Fix: Swap the order — poll the queue first, acquire the semaphore only when there is actual work to do. The semaphore is never borrowed for null-polls, so
availablePermits()accurately reflects true available capacity at all times.Side effect on
testDownloadLogInParallelAndInPriority: With poll-first semantics the single download thread polls one extra item from the queue before blocking on the semaphore (4 downloading + 1 pending acquire). The hardisEqualTo(totalSegments - 4)queue-size assertion is relaxed toisBetween(totalSegments - 5, totalSegments - 4)to cover both cases.Test Plan
./mvnw test -Dtest=RemoteLogDownloaderTest -pl fluss-client— all 3 tests pass./mvnw spotless:check -pl fluss-client— no formatting violations