From e1a694b0d28781bccd2dc3fc7c9de9e19f30ff50 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 16 Apr 2026 10:31:26 +0800 Subject: [PATCH 1/3] Fix IoTConsensusV2 receiver writer borrow race --- .../IoTConsensusV2Receiver.java | 68 +++++++++---------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index adf806544457a..839d6e6bbb9c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -1001,47 +1001,43 @@ public IoTConsensusV2TsFileWriter tryToFindCorrespondingWriter(TCommitId commitI return tsFileWriter.orElse(null); } - @SuppressWarnings("java:S3655") public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) { - Optional tsFileWriter = - iotConsensusV2TsFileWriterPool.stream() - .filter( - item -> - item.isUsed() - && Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent())) - .findFirst(); + lock.lock(); + try { + while (true) { + final Optional correspondingWriter = + iotConsensusV2TsFileWriterPool.stream() + .filter( + item -> + item.isUsed() + && Objects.equals( + commitId, item.getCommitIdOfCorrespondingHolderEvent())) + .findFirst(); + if (correspondingWriter.isPresent()) { + return correspondingWriter.get().refreshLastUsedTs(); + } - // If the TsFileInsertionEvent is first using tsFileWriter, we will find the first available - // buffer for it. - if (!tsFileWriter.isPresent()) { - // We should synchronously find the idle writer to avoid concurrency issues. - lock.lock(); - try { - // We need to check tsFileWriter.isPresent() here. Since there may be both retry-sent - // tsfile - // events and real-time-sent tsfile events, causing the receiver's tsFileWriter load to - // exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize(). - while (!tsFileWriter.isPresent()) { - tsFileWriter = - iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); - condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS); + final Optional idleWriter = + iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); + if (idleWriter.isPresent()) { + idleWriter.get().setUsed(true); + idleWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId); + return idleWriter.get().refreshLastUsedTs(); } - tsFileWriter.get().setUsed(true); - tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - final String errorStr = - String.format( - "IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.", - consensusPipeName); - LOGGER.warn(errorStr); - throw new RuntimeException(errorStr); - } finally { - lock.unlock(); + + condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS); } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + final String errorStr = + String.format( + "IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.", + consensusPipeName); + LOGGER.warn(errorStr); + throw new RuntimeException(errorStr); + } finally { + lock.unlock(); } - - return tsFileWriter.get().refreshLastUsedTs(); } private void checkZombieTsFileWriter() { From 1d4324a7979aab443c58ca96b907e9b2c87116f4 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 16 Apr 2026 10:45:20 +0800 Subject: [PATCH 2/3] Keep IoTConsensusV2 writer borrow fast path --- .../IoTConsensusV2Receiver.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index 839d6e6bbb9c0..9f7dae87ef269 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -1002,21 +1002,20 @@ public IoTConsensusV2TsFileWriter tryToFindCorrespondingWriter(TCommitId commitI } public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) { + final Optional correspondingWriter = + iotConsensusV2TsFileWriterPool.stream() + .filter( + item -> + item.isUsed() + && Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent())) + .findFirst(); + if (correspondingWriter.isPresent()) { + return correspondingWriter.get().refreshLastUsedTs(); + } + lock.lock(); try { while (true) { - final Optional correspondingWriter = - iotConsensusV2TsFileWriterPool.stream() - .filter( - item -> - item.isUsed() - && Objects.equals( - commitId, item.getCommitIdOfCorrespondingHolderEvent())) - .findFirst(); - if (correspondingWriter.isPresent()) { - return correspondingWriter.get().refreshLastUsedTs(); - } - final Optional idleWriter = iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); if (idleWriter.isPresent()) { From 922620b5075eacf01e644fd3404e758f2ff73687 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 16 Apr 2026 15:24:15 +0800 Subject: [PATCH 3/3] fix data race --- .../protocol/iotconsensusv2/IoTConsensusV2Receiver.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index 9f7dae87ef269..e0ef8e4072ce0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -1019,9 +1019,12 @@ public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) final Optional idleWriter = iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); if (idleWriter.isPresent()) { - idleWriter.get().setUsed(true); - idleWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId); - return idleWriter.get().refreshLastUsedTs(); + final IoTConsensusV2TsFileWriter writer = idleWriter.get(); + // Publish commitId before marking the writer as used so lock-free lookup callers + // observing isUsed=true can always see the bound commitId as well. + writer.setCommitIdOfCorrespondingHolderEvent(commitId); + writer.setUsed(true); + return writer.refreshLastUsedTs(); } condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);