diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index adf806544457a..e0ef8e4072ce0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -1001,47 +1001,45 @@ public IoTConsensusV2TsFileWriter tryToFindCorrespondingWriter(TCommitId commitI return tsFileWriter.orElse(null); } - @SuppressWarnings("java:S3655") public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) { - Optional tsFileWriter = + final Optional correspondingWriter = iotConsensusV2TsFileWriterPool.stream() .filter( item -> item.isUsed() && Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent())) .findFirst(); + if (correspondingWriter.isPresent()) { + return correspondingWriter.get().refreshLastUsedTs(); + } - // If the TsFileInsertionEvent is first using tsFileWriter, we will find the first available - // buffer for it. - if (!tsFileWriter.isPresent()) { - // We should synchronously find the idle writer to avoid concurrency issues. - lock.lock(); - try { - // We need to check tsFileWriter.isPresent() here. Since there may be both retry-sent - // tsfile - // events and real-time-sent tsfile events, causing the receiver's tsFileWriter load to - // exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize(). - while (!tsFileWriter.isPresent()) { - tsFileWriter = - iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); - condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS); + lock.lock(); + try { + while (true) { + final Optional idleWriter = + iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst(); + if (idleWriter.isPresent()) { + final IoTConsensusV2TsFileWriter writer = idleWriter.get(); + // Publish commitId before marking the writer as used so lock-free lookup callers + // observing isUsed=true can always see the bound commitId as well. + writer.setCommitIdOfCorrespondingHolderEvent(commitId); + writer.setUsed(true); + return writer.refreshLastUsedTs(); } - tsFileWriter.get().setUsed(true); - tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - final String errorStr = - String.format( - "IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.", - consensusPipeName); - LOGGER.warn(errorStr); - throw new RuntimeException(errorStr); - } finally { - lock.unlock(); + + condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS); } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + final String errorStr = + String.format( + "IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.", + consensusPipeName); + LOGGER.warn(errorStr); + throw new RuntimeException(errorStr); + } finally { + lock.unlock(); } - - return tsFileWriter.get().refreshLastUsedTs(); } private void checkZombieTsFileWriter() {