From 5afa1cfd32b27858d0bb1d39d709e3ac1a863b9d Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Tue, 14 Apr 2026 15:28:04 +0800 Subject: [PATCH 1/7] Pipe: deduplicate historical tsfile events per task scope --- ...icalDataRegionTsFileAndDeletionSource.java | 41 +++++++ .../PipeRealtimeDataRegionHybridSource.java | 9 +- .../PipeRealtimeDataRegionSource.java | 11 +- .../PipeRealtimeDataRegionTsFileSource.java | 7 +- .../PipeTsFileEpochProgressIndexKeeper.java | 29 +++-- ...ipeTsFileEpochProgressIndexKeeperTest.java | 116 ++++++++++++++++++ 6 files changed, 200 insertions(+), 13 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 66f8d48ce2813..187e75db22f2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; @@ -124,6 +125,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private String pipeName; private long creationTime; + private String tsFileDedupScopeID; private PipeTaskMeta pipeTaskMeta; private ProgressIndex startIndex; @@ -320,6 +322,14 @@ public void customize( } dataRegionId = environment.getRegionId(); + tsFileDedupScopeID = + pipeName + + "_" + + dataRegionId + + "_" + + creationTime + + "_" + + Integer.toHexString(System.identityHashCode(environment)); treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); @@ -850,6 +860,30 @@ private Event supplyTsFileEvent(final TsFileResource resource) { return isReferenceCountIncreased ? progressReportEvent : null; } + if (shouldSkipHistoricalTsFileEvent(resource)) { + filteredTsFileResources2TableNames.remove(resource); + LOGGER.info( + "Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.", + pipeName, + dataRegionId, + resource.getTsFilePath(), + tsFileDedupScopeID); + try { + return null; + } finally { + try { + PipeDataNodeResourceManager.tsfile() + .unpinTsFileResource(resource, shouldTransferModFile, pipeName); + } catch (final IOException e) { + LOGGER.warn( + "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", + pipeName, + dataRegionId, + resource.getTsFilePath()); + } + } + } + final PipeTsFileInsertionEvent event = new PipeTsFileInsertionEvent( isModelDetected ? isTableModel : null, @@ -916,6 +950,13 @@ private Event supplyTsFileEvent(final TsFileResource resource) { } } + private boolean shouldSkipHistoricalTsFileEvent(final TsFileResource resource) { + return pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX) + && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 + && PipeTsFileEpochProgressIndexKeeper.getInstance() + .containsTsFile(dataRegionId, tsFileDedupScopeID, resource.getTsFilePath()); + } + private Event supplyDeletionEvent(final DeletionResource deletionResource) { final PipeDeleteDataNodeEvent event = new PipeDeleteDataNodeEvent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java index c9e3f35288a83..97b6d54fde55f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java @@ -83,7 +83,8 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) { if (canNotUseTabletAnymore(event)) { event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE); PipeTsFileEpochProgressIndexKeeper.getInstance() - .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); + .registerProgressIndex( + dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource()); } else { event .getTsFileEpoch() @@ -156,7 +157,8 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) { case USING_TABLET: // If the state is USING_TABLET, discard the event PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); + .eliminateProgressIndex( + dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath()); event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false); return; case EMPTY: @@ -283,7 +285,8 @@ private Event supplyTsFileInsertion(final PipeRealtimeEvent event) { PipeDataNodeAgent.runtime() .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); + .eliminateProgressIndex( + dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath()); return null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 718529243d9d0..1792e6a11928f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -133,6 +133,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { protected String pipeID; private String taskID; + private String tsFileDedupScopeID; protected long userId; protected String userName; protected String cliHostname; @@ -226,6 +227,8 @@ public void customize( creationTime = environment.getCreationTime(); pipeID = pipeName + "_" + creationTime; taskID = pipeName + "_" + dataRegionId + "_" + creationTime; + tsFileDedupScopeID = + taskID + "_" + Integer.toHexString(System.identityHashCode(environment)); treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); @@ -322,6 +325,8 @@ public void close() throws Exception { if (dataRegionId >= 0) { PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this); PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this); + PipeTsFileEpochProgressIndexKeeper.getInstance() + .clearProgressIndex(dataRegionId, tsFileDedupScopeID); } synchronized (isClosed) { @@ -576,7 +581,7 @@ private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent event) if (PipeTsFileEpochProgressIndexKeeper.getInstance() .isProgressIndexAfterOrEquals( dataRegionId, - pipeName, + tsFileDedupScopeID, event.getTsFileEpoch().getFilePath(), getProgressIndex4RealtimeEvent(event))) { event.skipReportOnCommit(); @@ -648,6 +653,10 @@ public String getTaskID() { return taskID; } + protected String getTsFileDedupScopeID() { + return tsFileDedupScopeID; + } + public void increaseExtractEpochSize() { extractEpochSize.incrementAndGet(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java index 98bfb30391a17..97c3138de7c7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java @@ -52,7 +52,8 @@ protected void doExtract(PipeRealtimeEvent event) { event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); PipeTsFileEpochProgressIndexKeeper.getInstance() - .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); + .registerProgressIndex( + dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource()); if (!(event.getEvent() instanceof TsFileInsertionEvent)) { event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false); @@ -104,7 +105,9 @@ public Event supply() { .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); PipeTsFileEpochProgressIndexKeeper.getInstance() .eliminateProgressIndex( - dataRegionId, pipeName, realtimeEvent.getTsFileEpoch().getFilePath()); + dataRegionId, + getTsFileDedupScopeID(), + realtimeEvent.getTsFileEpoch().getFilePath()); } realtimeEvent.decreaseReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java index bf15dcdc5475a..b9b1ee3c41908 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java @@ -31,34 +31,49 @@ public class PipeTsFileEpochProgressIndexKeeper { - // data region id -> pipeName -> tsFile path -> max progress index + // data region id -> task scope id -> tsFile path -> max progress index private final Map>> progressIndexKeeper = new ConcurrentHashMap<>(); public synchronized void registerProgressIndex( - final int dataRegionId, final String pipeName, final TsFileResource resource) { + final int dataRegionId, final String taskScopeID, final TsFileResource resource) { progressIndexKeeper .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) .putIfAbsent(resource.getTsFilePath(), resource); } public synchronized void eliminateProgressIndex( - final int dataRegionId, final @Nonnull String pipeName, final String filePath) { + final int dataRegionId, final @Nonnull String taskScopeID, final String filePath) { progressIndexKeeper .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) .remove(filePath); } + public synchronized void clearProgressIndex( + final int dataRegionId, final @Nonnull String taskScopeID) { + progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .remove(taskScopeID); + } + + public synchronized boolean containsTsFile( + final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) { + return progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) + .containsKey(tsFilePath); + } + public synchronized boolean isProgressIndexAfterOrEquals( final int dataRegionId, - final String pipeName, + final String taskScopeID, final String tsFilePath, final ProgressIndex progressIndex) { return progressIndexKeeper .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) .entrySet() .stream() .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath)) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java new file mode 100644 index 0000000000000..8926d63c2ebad --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; + +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +public class PipeTsFileEpochProgressIndexKeeperTest { + + private static final int DATA_REGION_ID = 1; + private static final String TASK_SCOPE_A = "task-scope-a"; + private static final String TASK_SCOPE_B = "task-scope-b"; + + private final PipeTsFileEpochProgressIndexKeeper keeper = + PipeTsFileEpochProgressIndexKeeper.getInstance(); + + private File tempDir; + + @Before + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("pipeTsFileEpochProgressIndexKeeper").toFile(); + } + + @After + public void tearDown() { + keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A); + keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_B); + FileUtils.deleteFileOrDirectory(tempDir); + } + + @Test + public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws IOException { + final TsFileResource resource = createTsFileResource("shared.tsfile", 1L); + + keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource); + + Assert.assertTrue(keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, resource.getTsFilePath())); + Assert.assertFalse( + keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, resource.getTsFilePath())); + } + + @Test + public void testProgressIndexCheckDoesNotLeakAcrossTaskScopes() throws IOException { + keeper.registerProgressIndex( + DATA_REGION_ID, TASK_SCOPE_A, createTsFileResource("1-1-0-0.tsfile", 1L)); + + final TsFileResource comparedResource = createTsFileResource("1-2-0-0.tsfile", 2L); + keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, comparedResource); + + Assert.assertTrue( + keeper.isProgressIndexAfterOrEquals( + DATA_REGION_ID, + TASK_SCOPE_A, + comparedResource.getTsFilePath(), + new SimpleProgressIndex(1, 2L))); + Assert.assertFalse( + keeper.isProgressIndexAfterOrEquals( + DATA_REGION_ID, + TASK_SCOPE_B, + comparedResource.getTsFilePath(), + new SimpleProgressIndex(1, 2L))); + } + + @Test + public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws IOException { + final TsFileResource scopeAResource = createTsFileResource("scope-a.tsfile", 1L); + final TsFileResource scopeBResource = createTsFileResource("scope-b.tsfile", 1L); + + keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource); + keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource); + + keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A); + + Assert.assertFalse( + keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource.getTsFilePath())); + Assert.assertTrue( + keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource.getTsFilePath())); + } + + private TsFileResource createTsFileResource(final String fileName, final long flushOrderId) + throws IOException { + final File file = new File(tempDir, fileName); + Assert.assertTrue(file.createNewFile()); + + final TsFileResource resource = new TsFileResource(file); + resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId)); + return resource; + } +} From 787c8d0d4d7ab631fd259fcde0b344b4d942a7b1 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 15 Apr 2026 16:12:57 +0800 Subject: [PATCH 2/7] Pipe: address historical tsfile dedup review comments --- ...icalDataRegionTsFileAndDeletionSource.java | 3 +- .../PipeTsFileEpochProgressIndexKeeper.java | 66 ++++++++++++++----- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 187e75db22f2b..306f7d0ace424 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -879,7 +879,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) { "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", pipeName, dataRegionId, - resource.getTsFilePath()); + resource.getTsFilePath(), + e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java index b9b1ee3c41908..95b371c4bcf66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java @@ -45,25 +45,52 @@ public synchronized void registerProgressIndex( public synchronized void eliminateProgressIndex( final int dataRegionId, final @Nonnull String taskScopeID, final String filePath) { - progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) - .remove(filePath); + final Map> scopeProgressIndexKeeper = + progressIndexKeeper.get(dataRegionId); + if (scopeProgressIndexKeeper == null) { + return; + } + + final Map tsFileProgressIndexKeeper = + scopeProgressIndexKeeper.get(taskScopeID); + if (tsFileProgressIndexKeeper == null) { + return; + } + + tsFileProgressIndexKeeper.remove(filePath); + if (tsFileProgressIndexKeeper.isEmpty()) { + scopeProgressIndexKeeper.remove(taskScopeID); + if (scopeProgressIndexKeeper.isEmpty()) { + progressIndexKeeper.remove(dataRegionId); + } + } } public synchronized void clearProgressIndex( final int dataRegionId, final @Nonnull String taskScopeID) { - progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .remove(taskScopeID); + final Map> scopeProgressIndexKeeper = + progressIndexKeeper.get(dataRegionId); + if (scopeProgressIndexKeeper == null) { + return; + } + + scopeProgressIndexKeeper.remove(taskScopeID); + if (scopeProgressIndexKeeper.isEmpty()) { + progressIndexKeeper.remove(dataRegionId); + } } public synchronized boolean containsTsFile( final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) { - return progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) - .containsKey(tsFilePath); + final Map> scopeProgressIndexKeeper = + progressIndexKeeper.get(dataRegionId); + if (scopeProgressIndexKeeper == null) { + return false; + } + + final Map tsFileProgressIndexKeeper = + scopeProgressIndexKeeper.get(taskScopeID); + return tsFileProgressIndexKeeper != null && tsFileProgressIndexKeeper.containsKey(tsFilePath); } public synchronized boolean isProgressIndexAfterOrEquals( @@ -71,10 +98,19 @@ public synchronized boolean isProgressIndexAfterOrEquals( final String taskScopeID, final String tsFilePath, final ProgressIndex progressIndex) { - return progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) - .entrySet() + final Map> scopeProgressIndexKeeper = + progressIndexKeeper.get(dataRegionId); + if (scopeProgressIndexKeeper == null) { + return false; + } + + final Map tsFileProgressIndexKeeper = + scopeProgressIndexKeeper.get(taskScopeID); + if (tsFileProgressIndexKeeper == null) { + return false; + } + + return tsFileProgressIndexKeeper.entrySet() .stream() .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath)) .map(Entry::getValue) From 3710c2fa3061d7f3b8a6b06b99a0d3ed4ce0568a Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 15 Apr 2026 16:31:20 +0800 Subject: [PATCH 3/7] spotless --- .../dataregion/realtime/PipeRealtimeDataRegionSource.java | 3 +-- .../realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java | 3 +-- .../assigner/PipeTsFileEpochProgressIndexKeeperTest.java | 3 ++- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 1792e6a11928f..b1c6d130844a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -227,8 +227,7 @@ public void customize( creationTime = environment.getCreationTime(); pipeID = pipeName + "_" + creationTime; taskID = pipeName + "_" + dataRegionId + "_" + creationTime; - tsFileDedupScopeID = - taskID + "_" + Integer.toHexString(System.identityHashCode(environment)); + tsFileDedupScopeID = taskID + "_" + Integer.toHexString(System.identityHashCode(environment)); treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java index 95b371c4bcf66..aaf03f570e2da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java @@ -110,8 +110,7 @@ public synchronized boolean isProgressIndexAfterOrEquals( return false; } - return tsFileProgressIndexKeeper.entrySet() - .stream() + return tsFileProgressIndexKeeper.entrySet().stream() .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath)) .map(Entry::getValue) .filter(Objects::nonNull) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java index 8926d63c2ebad..4d27fff5d5772 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java @@ -61,7 +61,8 @@ public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws IOException keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource); - Assert.assertTrue(keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, resource.getTsFilePath())); + Assert.assertTrue( + keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A, resource.getTsFilePath())); Assert.assertFalse( keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B, resource.getTsFilePath())); } From 9e753a9a23b0ac867ca9be9c971279b55c279d46 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 15 Apr 2026 16:58:05 +0800 Subject: [PATCH 4/7] Pipe: fix dedup scope cleanup and historical skip loop --- .../PipeCompactedTsFileInsertionEvent.java | 5 +- .../tsfile/PipeTsFileInsertionEvent.java | 57 ++++--- ...icalDataRegionTsFileAndDeletionSource.java | 74 +++++---- .../PipeRealtimeDataRegionSource.java | 2 +- .../assigner/PipeDataRegionAssigner.java | 1 + .../event/PipeTsFileInsertionEventTest.java | 93 +++++++++++ ...DataRegionTsFileAndDeletionSourceTest.java | 154 ++++++++++++++++++ 7 files changed, 330 insertions(+), 56 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java index 343c8d8932931..95ff0a25373fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java @@ -91,6 +91,7 @@ public PipeCompactedTsFileInsertionEvent( // init fields of PipeTsFileInsertionEvent flushPointCount = bindFlushPointCount(originalEvents); overridingProgressIndex = bindOverridingProgressIndex(originalEvents); + bindTsFileDedupScopeID(anyOfOriginalEvents.getTsFileDedupScopeID()); } private static boolean bindIsWithMod(Set originalEvents) { @@ -184,10 +185,10 @@ public boolean equalsInIoTConsensusV2(final Object o) { @Override public void eliminateProgressIndex() { - if (Objects.isNull(overridingProgressIndex)) { + if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(getTsFileDedupScopeID())) { for (final String originFilePath : originFilePaths) { PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(dataRegionId, pipeName, originFilePath); + .eliminateProgressIndex(dataRegionId, getTsFileDedupScopeID(), originFilePath); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 1505e15996fe6..adddc9d7ce5e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -99,6 +99,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected volatile ProgressIndex overridingProgressIndex; private Set tableNames; + private String tsFileDedupScopeID; // This is set to check the tsFile paths by privilege private Map treeSchemaMap; @@ -398,13 +399,26 @@ public ProgressIndex forceGetProgressIndex() { } public void eliminateProgressIndex() { - if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) { + if (Objects.isNull(overridingProgressIndex) + && Objects.nonNull(resource) + && Objects.nonNull(tsFileDedupScopeID)) { PipeTsFileEpochProgressIndexKeeper.getInstance() .eliminateProgressIndex( - Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath()); + Integer.parseInt(resource.getDataRegionId()), + tsFileDedupScopeID, + resource.getTsFilePath()); } } + public PipeTsFileInsertionEvent bindTsFileDedupScopeID(final String tsFileDedupScopeID) { + this.tsFileDedupScopeID = tsFileDedupScopeID; + return this; + } + + public String getTsFileDedupScopeID() { + return tsFileDedupScopeID; + } + @Override public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, @@ -419,25 +433,26 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep final long startTime, final long endTime) { return new PipeTsFileInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - resource, - tsFile, - isWithMod, - isLoaded, - isGeneratedByHistoricalExtractor, - tableNames, - pipeName, - creationTime, - pipeTaskMeta, - treePattern, - tablePattern, - userId, - userName, - cliHostname, - skipIfNoPrivileges, - startTime, - endTime); + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + resource, + tsFile, + isWithMod, + isLoaded, + isGeneratedByHistoricalExtractor, + tableNames, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime) + .bindTsFileDedupScopeID(tsFileDedupScopeID); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 306f7d0ace424..6a0eb53df7ec9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -814,14 +814,19 @@ public synchronized Event supply() { return null; } - final PersistentResource resource = pendingQueue.poll(); - if (resource == null) { - return supplyTerminateEvent(); - } else if (resource instanceof TsFileResource) { - return supplyTsFileEvent((TsFileResource) resource); - } else { + PersistentResource resource; + while ((resource = pendingQueue.poll()) != null) { + if (resource instanceof TsFileResource) { + final TsFileResource tsFileResource = (TsFileResource) resource; + if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { + continue; + } + return supplyTsFileEvent(tsFileResource); + } return supplyDeletionEvent((DeletionResource) resource); } + + return supplyTerminateEvent(); } private Event supplyTerminateEvent() { @@ -844,7 +849,37 @@ private Event supplyTerminateEvent() { return terminateEvent; } - private Event supplyTsFileEvent(final TsFileResource resource) { + protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileResource resource) { + if (!filteredTsFileResources2TableNames.containsKey(resource) + || !shouldSkipHistoricalTsFileEvent(resource)) { + return false; + } + + filteredTsFileResources2TableNames.remove(resource); + LOGGER.info( + "Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.", + pipeName, + dataRegionId, + resource.getTsFilePath(), + tsFileDedupScopeID); + try { + return true; + } finally { + try { + PipeDataNodeResourceManager.tsfile() + .unpinTsFileResource(resource, shouldTransferModFile, pipeName); + } catch (final IOException e) { + LOGGER.warn( + "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", + pipeName, + dataRegionId, + resource.getTsFilePath(), + e); + } + } + } + + protected Event supplyTsFileEvent(final TsFileResource resource) { if (!filteredTsFileResources2TableNames.containsKey(resource)) { final ProgressReportEvent progressReportEvent = new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta); @@ -860,31 +895,6 @@ private Event supplyTsFileEvent(final TsFileResource resource) { return isReferenceCountIncreased ? progressReportEvent : null; } - if (shouldSkipHistoricalTsFileEvent(resource)) { - filteredTsFileResources2TableNames.remove(resource); - LOGGER.info( - "Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.", - pipeName, - dataRegionId, - resource.getTsFilePath(), - tsFileDedupScopeID); - try { - return null; - } finally { - try { - PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource(resource, shouldTransferModFile, pipeName); - } catch (final IOException e) { - LOGGER.warn( - "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", - pipeName, - dataRegionId, - resource.getTsFilePath(), - e); - } - } - } - final PipeTsFileInsertionEvent event = new PipeTsFileInsertionEvent( isModelDetected ? isTableModel : null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index b1c6d130844a3..bb804a899c45c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -652,7 +652,7 @@ public String getTaskID() { return taskID; } - protected String getTsFileDedupScopeID() { + public final String getTsFileDedupScopeID() { return tsFileDedupScopeID; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9c7182f051c74..bdeebde8938e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -190,6 +190,7 @@ private void assignToSource( if (innerEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) innerEvent; + tsFileInsertionEvent.bindTsFileDedupScopeID(source.getTsFileDedupScopeID()); tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 5ba0843bf8003..82c52f39ec718 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -23,13 +23,17 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeCompactedTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckContext; @@ -50,8 +54,11 @@ import org.apache.tsfile.read.common.TimeRange; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -157,6 +164,92 @@ public void testAuthCheck() throws Exception { } } + @Test + public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws Exception { + final PipeTsFileEpochProgressIndexKeeper keeper = + PipeTsFileEpochProgressIndexKeeper.getInstance(); + final int dataRegionId = 1; + final String scopeA = "scope-a"; + final String scopeB = "scope-b"; + final File tempDir = Files.createTempDirectory("pipeTsFileDedupScope").toFile(); + + try { + final TsFileResource sourceResource = + createSpyTsFileResource(tempDir, "source.tsfile", 1L, dataRegionId); + keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource); + keeper.registerProgressIndex(dataRegionId, scopeB, sourceResource); + + final PipeTsFileInsertionEvent sourceEvent = + new PipeTsFileInsertionEvent( + true, + "db", + sourceResource, + null, + true, + false, + false, + Collections.singleton("table"), + "pipe", + 1L, + null, + null, + null, + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE) + .bindTsFileDedupScopeID(scopeA); + + sourceEvent.eliminateProgressIndex(); + Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + + keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource); + final PipeTsFileInsertionEvent copiedEvent = + sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + "pipe", 2L, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE); + Assert.assertEquals(scopeA, copiedEvent.getTsFileDedupScopeID()); + copiedEvent.eliminateProgressIndex(); + Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + + keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource); + final TsFileResource compactedResource = + createSpyTsFileResource(tempDir, "compacted.tsfile", 2L, dataRegionId); + final PipeCompactedTsFileInsertionEvent compactedEvent = + new PipeCompactedTsFileInsertionEvent( + new CommitterKey("pipe", 1L, dataRegionId, 0), + Collections.singleton(sourceEvent), + sourceEvent, + compactedResource, + true); + Assert.assertEquals(scopeA, compactedEvent.getTsFileDedupScopeID()); + compactedEvent.eliminateProgressIndex(); + Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + } finally { + keeper.clearProgressIndex(dataRegionId, scopeA); + keeper.clearProgressIndex(dataRegionId, scopeB); + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + private TsFileResource createSpyTsFileResource( + final File tempDir, final String fileName, final long flushOrderId, final int dataRegionId) + throws IOException { + final File file = new File(tempDir, fileName); + Assert.assertTrue(file.createNewFile()); + + final TsFileResource resource = new TsFileResource(file); + resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId)); + + final TsFileResource spyResource = Mockito.spy(resource); + Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId(); + return spyResource; + } + static class TestAccessControl implements AccessControl { @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java new file mode 100644 index 0000000000000..e0c781ab65f2f --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.historical; + +import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.pipe.api.event.Event; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest { + + @Test + public void testSupplyContinuesAfterSkippingDuplicateHistoricalTsFile() throws Exception { + final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source = + new TestablePipeHistoricalDataRegionTsFileAndDeletionSource(); + final Event expectedEvent = new Event() {}; + final File tempDir = Files.createTempDirectory("pipeHistoricalSkipDuplicate").toFile(); + + try { + final TsFileResource skippedResource = createTsFileResource(tempDir, "skip.tsfile"); + final TsFileResource nextResource = createTsFileResource(tempDir, "next.tsfile"); + + source.setSkippedTsFilePaths(skippedResource.getTsFilePath()); + source.setSuppliedEvent(expectedEvent); + setPrivateField(source, "hasBeenStarted", true); + setPrivateField( + source, + "pendingQueue", + new ArrayDeque(Arrays.asList(skippedResource, nextResource))); + + Assert.assertSame(expectedEvent, source.supply()); + Assert.assertEquals( + Arrays.asList(skippedResource.getTsFilePath()), source.getConsumedSkippedTsFilePaths()); + Assert.assertEquals(Arrays.asList(nextResource.getTsFilePath()), source.getSuppliedTsFiles()); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testSupplyDoesNotSwallowNonSkippedNullTsFileEvent() throws Exception { + final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source = + new TestablePipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = Files.createTempDirectory("pipeHistoricalNullSemantics").toFile(); + + try { + final TsFileResource firstResource = createTsFileResource(tempDir, "first.tsfile"); + final TsFileResource secondResource = createTsFileResource(tempDir, "second.tsfile"); + + source.setSuppliedEvent(null); + setPrivateField(source, "hasBeenStarted", true); + setPrivateField( + source, + "pendingQueue", + new ArrayDeque(Arrays.asList(firstResource, secondResource))); + + Assert.assertNull(source.supply()); + Assert.assertEquals(Arrays.asList(firstResource.getTsFilePath()), source.getSuppliedTsFiles()); + Assert.assertEquals(1, source.getPendingQueueSize()); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + private static TsFileResource createTsFileResource(final File tempDir, final String fileName) + throws IOException { + final File file = new File(tempDir, fileName); + Assert.assertTrue(file.createNewFile()); + return new TsFileResource(file); + } + + private static void setPrivateField( + final PipeHistoricalDataRegionTsFileAndDeletionSource source, + final String fieldName, + final Object value) + throws ReflectiveOperationException { + final Field field = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(source, value); + } + + private static class TestablePipeHistoricalDataRegionTsFileAndDeletionSource + extends PipeHistoricalDataRegionTsFileAndDeletionSource { + + private final Set skippedTsFilePaths = new HashSet<>(); + private final List consumedSkippedTsFilePaths = new ArrayList<>(); + private final List suppliedTsFiles = new ArrayList<>(); + private Event suppliedEvent; + + private void setSkippedTsFilePaths(final String... skippedTsFilePaths) { + this.skippedTsFilePaths.clear(); + this.skippedTsFilePaths.addAll(Arrays.asList(skippedTsFilePaths)); + } + + private List getConsumedSkippedTsFilePaths() { + return consumedSkippedTsFilePaths; + } + + private List getSuppliedTsFiles() { + return suppliedTsFiles; + } + + private void setSuppliedEvent(final Event suppliedEvent) { + this.suppliedEvent = suppliedEvent; + } + + @Override + protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileResource resource) { + if (!skippedTsFilePaths.contains(resource.getTsFilePath())) { + return false; + } + consumedSkippedTsFilePaths.add(resource.getTsFilePath()); + return true; + } + + @Override + protected Event supplyTsFileEvent(final TsFileResource resource) { + suppliedTsFiles.add(resource.getTsFilePath()); + return suppliedEvent; + } + } +} From 1382798224ba383091e89fe7fdbc213117cfaeba Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 15 Apr 2026 17:06:10 +0800 Subject: [PATCH 5/7] spotless --- .../event/PipeTsFileInsertionEventTest.java | 18 ++++++++++++------ ...lDataRegionTsFileAndDeletionSourceTest.java | 6 ++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 82c52f39ec718..db5452e0b92a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -203,8 +203,10 @@ public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws Exceptio .bindTsFileDedupScopeID(scopeA); sourceEvent.eliminateProgressIndex(); - Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); - Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + Assert.assertFalse( + keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue( + keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource); final PipeTsFileInsertionEvent copiedEvent = @@ -212,8 +214,10 @@ public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws Exceptio "pipe", 2L, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE); Assert.assertEquals(scopeA, copiedEvent.getTsFileDedupScopeID()); copiedEvent.eliminateProgressIndex(); - Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); - Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + Assert.assertFalse( + keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue( + keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource); final TsFileResource compactedResource = @@ -227,8 +231,10 @@ public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws Exceptio true); Assert.assertEquals(scopeA, compactedEvent.getTsFileDedupScopeID()); compactedEvent.eliminateProgressIndex(); - Assert.assertFalse(keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); - Assert.assertTrue(keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); + Assert.assertFalse( + keeper.containsTsFile(dataRegionId, scopeA, sourceResource.getTsFilePath())); + Assert.assertTrue( + keeper.containsTsFile(dataRegionId, scopeB, sourceResource.getTsFilePath())); } finally { keeper.clearProgressIndex(dataRegionId, scopeA); keeper.clearProgressIndex(dataRegionId, scopeB); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java index e0c781ab65f2f..cfb36823ec215 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -86,7 +86,8 @@ public void testSupplyDoesNotSwallowNonSkippedNullTsFileEvent() throws Exception new ArrayDeque(Arrays.asList(firstResource, secondResource))); Assert.assertNull(source.supply()); - Assert.assertEquals(Arrays.asList(firstResource.getTsFilePath()), source.getSuppliedTsFiles()); + Assert.assertEquals( + Arrays.asList(firstResource.getTsFilePath()), source.getSuppliedTsFiles()); Assert.assertEquals(1, source.getPendingQueueSize()); } finally { FileUtils.deleteFileOrDirectory(tempDir); @@ -137,7 +138,8 @@ private void setSuppliedEvent(final Event suppliedEvent) { } @Override - protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileResource resource) { + protected boolean consumeSkippedHistoricalTsFileEventIfNecessary( + final TsFileResource resource) { if (!skippedTsFilePaths.contains(resource.getTsFilePath())) { return false; } From c7e44c333847ef3b08fae9eaa248ae338e986889 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 16 Apr 2026 01:03:57 +0800 Subject: [PATCH 6/7] Refine historical tsfile dedup supply semantics --- ...icalDataRegionTsFileAndDeletionSource.java | 49 ++++++++++--------- ...DataRegionTsFileAndDeletionSourceTest.java | 17 ++++++- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 6a0eb53df7ec9..2ca283b139551 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -814,19 +814,18 @@ public synchronized Event supply() { return null; } - PersistentResource resource; - while ((resource = pendingQueue.poll()) != null) { - if (resource instanceof TsFileResource) { - final TsFileResource tsFileResource = (TsFileResource) resource; - if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { - continue; - } - return supplyTsFileEvent(tsFileResource); - } - return supplyDeletionEvent((DeletionResource) resource); + final PersistentResource resource = pendingQueue.poll(); + if (resource == null) { + return supplyTerminateEvent(); } - return supplyTerminateEvent(); + if (resource instanceof TsFileResource) { + final TsFileResource tsFileResource = (TsFileResource) resource; + return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource) + ? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()) + : supplyTsFileEvent(tsFileResource); + } + return supplyDeletionEvent((DeletionResource) resource); } private Event supplyTerminateEvent() { @@ -879,20 +878,24 @@ protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileRes } } + protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) { + final ProgressReportEvent progressReportEvent = + new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta); + progressReportEvent.bindProgressIndex(progressIndex); + final boolean isReferenceCountIncreased = + progressReportEvent.increaseReferenceCount( + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName()); + if (!isReferenceCountIncreased) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + progressReportEvent); + } + return isReferenceCountIncreased ? progressReportEvent : null; + } + protected Event supplyTsFileEvent(final TsFileResource resource) { if (!filteredTsFileResources2TableNames.containsKey(resource)) { - final ProgressReportEvent progressReportEvent = - new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta); - progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex()); - final boolean isReferenceCountIncreased = - progressReportEvent.increaseReferenceCount( - PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName()); - if (!isReferenceCountIncreased) { - LOGGER.warn( - "The reference count of the event {} cannot be increased, skipping it.", - progressReportEvent); - } - return isReferenceCountIncreased ? progressReportEvent : null; + return supplyProgressReportEvent(resource.getMaxProgressIndex()); } final PipeTsFileInsertionEvent event = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java index cfb36823ec215..00fc34b1c71ce 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.historical; import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.event.Event; @@ -41,7 +42,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest { @Test - public void testSupplyContinuesAfterSkippingDuplicateHistoricalTsFile() throws Exception { + public void testSupplyReturnsProgressReportEventAfterSkippingDuplicateHistoricalTsFile() + throws Exception { final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source = new TestablePipeHistoricalDataRegionTsFileAndDeletionSource(); final Event expectedEvent = new Event() {}; @@ -59,9 +61,13 @@ public void testSupplyContinuesAfterSkippingDuplicateHistoricalTsFile() throws E "pendingQueue", new ArrayDeque(Arrays.asList(skippedResource, nextResource))); - Assert.assertSame(expectedEvent, source.supply()); + Assert.assertTrue(source.supply() instanceof ProgressReportEvent); Assert.assertEquals( Arrays.asList(skippedResource.getTsFilePath()), source.getConsumedSkippedTsFilePaths()); + Assert.assertTrue(source.getSuppliedTsFiles().isEmpty()); + Assert.assertEquals(1, source.getPendingQueueSize()); + + Assert.assertSame(expectedEvent, source.supply()); Assert.assertEquals(Arrays.asList(nextResource.getTsFilePath()), source.getSuppliedTsFiles()); } finally { FileUtils.deleteFileOrDirectory(tempDir); @@ -133,6 +139,13 @@ private List getSuppliedTsFiles() { return suppliedTsFiles; } + private int getPendingQueueSize() throws ReflectiveOperationException { + final Field field = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField("pendingQueue"); + field.setAccessible(true); + return ((ArrayDeque) field.get(this)).size(); + } + private void setSuppliedEvent(final Event suppliedEvent) { this.suppliedEvent = suppliedEvent; } From afb8244ebba9997ffa1975fe28858339494cce8c Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 16 Apr 2026 10:44:30 +0800 Subject: [PATCH 7/7] spotless --- ...icalDataRegionTsFileAndDeletionSourceTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java index 00fc34b1c71ce..14f97ef79d6b7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -139,11 +139,16 @@ private List getSuppliedTsFiles() { return suppliedTsFiles; } - private int getPendingQueueSize() throws ReflectiveOperationException { - final Field field = - PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField("pendingQueue"); - field.setAccessible(true); - return ((ArrayDeque) field.get(this)).size(); + @Override + public int getPendingQueueSize() { + try { + final Field field = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField("pendingQueue"); + field.setAccessible(true); + return ((ArrayDeque) field.get(this)).size(); + } catch (final ReflectiveOperationException e) { + throw new AssertionError(e); + } } private void setSuppliedEvent(final Event suppliedEvent) {