Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1001,47 +1001,45 @@ public IoTConsensusV2TsFileWriter tryToFindCorrespondingWriter(TCommitId commitI
return tsFileWriter.orElse(null);
}

@SuppressWarnings("java:S3655")
public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) {
Optional<IoTConsensusV2TsFileWriter> tsFileWriter =
final Optional<IoTConsensusV2TsFileWriter> correspondingWriter =
iotConsensusV2TsFileWriterPool.stream()
.filter(
item ->
item.isUsed()
&& Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent()))
.findFirst();
if (correspondingWriter.isPresent()) {
return correspondingWriter.get().refreshLastUsedTs();
}

// If the TsFileInsertionEvent is first using tsFileWriter, we will find the first available
// buffer for it.
if (!tsFileWriter.isPresent()) {
// We should synchronously find the idle writer to avoid concurrency issues.
lock.lock();
try {
// We need to check tsFileWriter.isPresent() here. Since there may be both retry-sent
// tsfile
// events and real-time-sent tsfile events, causing the receiver's tsFileWriter load to
// exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize().
while (!tsFileWriter.isPresent()) {
tsFileWriter =
iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
lock.lock();
try {
while (true) {
final Optional<IoTConsensusV2TsFileWriter> idleWriter =
iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
if (idleWriter.isPresent()) {
final IoTConsensusV2TsFileWriter writer = idleWriter.get();
// Publish commitId before marking the writer as used so lock-free lookup callers
// observing isUsed=true can always see the bound commitId as well.
writer.setCommitIdOfCorrespondingHolderEvent(commitId);
writer.setUsed(true);
return writer.refreshLastUsedTs();
}
tsFileWriter.get().setUsed(true);
tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
final String errorStr =
String.format(
"IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.",
consensusPipeName);
LOGGER.warn(errorStr);
throw new RuntimeException(errorStr);
} finally {
lock.unlock();

condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
final String errorStr =
String.format(
"IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.",
consensusPipeName);
LOGGER.warn(errorStr);
throw new RuntimeException(errorStr);
} finally {
lock.unlock();
}

return tsFileWriter.get().refreshLastUsedTs();
}

private void checkZombieTsFileWriter() {
Expand Down