Skip to content

[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472

Open
Pengzna wants to merge 7 commits intoapache:masterfrom
Pengzna:codex/iotv2-historical-tsfile-dedup
Open

[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472
Pengzna wants to merge 7 commits intoapache:masterfrom
Pengzna:codex/iotv2-historical-tsfile-dedup

Conversation

@Pengzna
Copy link
Copy Markdown
Collaborator

@Pengzna Pengzna commented Apr 14, 2026

Why

In IoTConsensusV2 batch mode, the same logical tsfile can be observed by both the realtime source and the historical source in the same consensus pipe task. When that happens, the duplicated historical tsfile event may allocate another replicateIndex for an already captured file and cause redundant receiver-side seal/retry behavior.

What Changed

  • introduce a per-task-instance tsfile dedup scope id shared by the realtime and historical sources
  • let the realtime source register captured tsfile paths by task scope
  • let the historical source skip a tsfile event if the realtime source in the same task has already captured that file
  • clear the scoped progress index cache when the realtime source closes
  • add unit tests to verify the scoped keeper behavior

Verification

  • mvn -pl iotdb-core/datanode -DskipITs -Dtest=PipeTsFileEpochProgressIndexKeeperTest test
  • mvn clean package -DskipTests -T 1C
  • local 3C3D IoTDB repro with data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensusV2
  • workload: DEVICE_NUMBER=10, SENSOR_NUMBER=1, BATCH_SIZE_PER_WRITE=100000, LOOP=10
  • executed 3 rounds of iot-benchmark -> flush
  • observed logs such as skip historical tsfile ... because realtime source in current task ... has already captured it
  • no receiver-side writing file null is not available, Failed to seal file, or 2204 errors were observed in this repro

Copilot AI review requested due to automatic review settings April 14, 2026 07:30
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses duplicate tsfile transfer in IoTConsensusV2 “batch mode” pipes by introducing a per-task-instance dedup scope shared between the realtime and historical sources, allowing the historical source to skip tsfiles already captured by the realtime source.

Changes:

  • Add a per-task tsFileDedupScopeID and use it as the key for tsfile progress index tracking/dedup.
  • Register realtime-captured tsfiles into a scoped keeper and have the historical source skip duplicates under the same scope.
  • Add unit tests covering scoped behavior (contains/check/clear) of the keeper.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java Switch keeper keying from pipeName to task-scope ID; add scoped clear + contains APIs.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java Generate/store task-scope ID and clear scoped keeper state on close; use scope in progress-index checks.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java Register/eliminate tsfile progress index using the new task-scope ID.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java Register/eliminate tsfile progress index using the new task-scope ID.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java Compute matching task-scope ID and skip historical tsfile events already captured by realtime under that scope.
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java New unit tests verifying the keeper’s scope isolation and scoped clearing.
Comments suppressed due to low confidence (1)

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java:83

  • isProgressIndexAfterOrEquals name doesn’t match its predicate: !resource.getMaxProgressIndex().isAfter(progressIndex) is equivalent to maxProgressIndex <= progressIndex (i.e., before or equal), not after or equal. This makes the API very easy to misuse. Please either invert the comparison to match the name, or rename the method to reflect the current behavior (and adjust tests/callers accordingly).
  public synchronized boolean isProgressIndexAfterOrEquals(
      final int dataRegionId,
      final String taskScopeID,
      final String tsFilePath,
      final ProgressIndex progressIndex) {
    return progressIndexKeeper
        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
        .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
        .entrySet()
        .stream()
        .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
        .map(Entry::getValue)
        .filter(Objects::nonNull)
        .anyMatch(resource -> !resource.getMaxProgressIndex().isAfter(progressIndex));
  }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Collaborator

@Caideyipi Caideyipi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants