Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public PipeCompactedTsFileInsertionEvent(
// init fields of PipeTsFileInsertionEvent
flushPointCount = bindFlushPointCount(originalEvents);
overridingProgressIndex = bindOverridingProgressIndex(originalEvents);
bindTsFileDedupScopeID(anyOfOriginalEvents.getTsFileDedupScopeID());
}

private static boolean bindIsWithMod(Set<PipeTsFileInsertionEvent> originalEvents) {
Expand Down Expand Up @@ -184,10 +185,10 @@ public boolean equalsInIoTConsensusV2(final Object o) {

@Override
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex)) {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(getTsFileDedupScopeID())) {
for (final String originFilePath : originFilePaths) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
.eliminateProgressIndex(dataRegionId, getTsFileDedupScopeID(), originFilePath);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent

protected volatile ProgressIndex overridingProgressIndex;
private Set<String> tableNames;
private String tsFileDedupScopeID;

// This is set to check the tsFile paths by privilege
private Map<IDeviceID, String[]> treeSchemaMap;
Expand Down Expand Up @@ -398,13 +399,26 @@ public ProgressIndex forceGetProgressIndex() {
}

public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
if (Objects.isNull(overridingProgressIndex)
&& Objects.nonNull(resource)
&& Objects.nonNull(tsFileDedupScopeID)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(
Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath());
Integer.parseInt(resource.getDataRegionId()),
tsFileDedupScopeID,
resource.getTsFilePath());
}
}

public PipeTsFileInsertionEvent bindTsFileDedupScopeID(final String tsFileDedupScopeID) {
this.tsFileDedupScopeID = tsFileDedupScopeID;
return this;
}

public String getTsFileDedupScopeID() {
return tsFileDedupScopeID;
}

@Override
public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
Expand All @@ -419,25 +433,26 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
final long startTime,
final long endTime) {
return new PipeTsFileInsertionEvent(
getRawIsTableModelEvent(),
getSourceDatabaseNameFromDataRegion(),
resource,
tsFile,
isWithMod,
isLoaded,
isGeneratedByHistoricalExtractor,
tableNames,
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
userId,
userName,
cliHostname,
skipIfNoPrivileges,
startTime,
endTime);
getRawIsTableModelEvent(),
getSourceDatabaseNameFromDataRegion(),
resource,
tsFile,
isWithMod,
isLoaded,
isGeneratedByHistoricalExtractor,
tableNames,
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
tablePattern,
userId,
userName,
cliHostname,
skipIfNoPrivileges,
startTime,
endTime)
.bindTsFileDedupScopeID(tsFileDedupScopeID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
Expand Down Expand Up @@ -124,6 +125,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource

private String pipeName;
private long creationTime;
private String tsFileDedupScopeID;

private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
Expand Down Expand Up @@ -320,6 +322,14 @@ public void customize(
}

dataRegionId = environment.getRegionId();
tsFileDedupScopeID =
pipeName
+ "_"
+ dataRegionId
+ "_"
+ creationTime
+ "_"
+ Integer.toHexString(System.identityHashCode(environment));

treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);
Expand Down Expand Up @@ -807,11 +817,15 @@ public synchronized Event supply() {
final PersistentResource resource = pendingQueue.poll();
if (resource == null) {
return supplyTerminateEvent();
} else if (resource instanceof TsFileResource) {
return supplyTsFileEvent((TsFileResource) resource);
} else {
return supplyDeletionEvent((DeletionResource) resource);
}

if (resource instanceof TsFileResource) {
final TsFileResource tsFileResource = (TsFileResource) resource;
return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
: supplyTsFileEvent(tsFileResource);
}
return supplyDeletionEvent((DeletionResource) resource);
}

private Event supplyTerminateEvent() {
Expand All @@ -834,20 +848,54 @@ private Event supplyTerminateEvent() {
return terminateEvent;
}

private Event supplyTsFileEvent(final TsFileResource resource) {
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
final ProgressReportEvent progressReportEvent =
new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
final boolean isReferenceCountIncreased =
progressReportEvent.increaseReferenceCount(
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
if (!isReferenceCountIncreased) {
protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileResource resource) {
if (!filteredTsFileResources2TableNames.containsKey(resource)
|| !shouldSkipHistoricalTsFileEvent(resource)) {
return false;
}

filteredTsFileResources2TableNames.remove(resource);
LOGGER.info(
"Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.",
pipeName,
dataRegionId,
resource.getTsFilePath(),
tsFileDedupScopeID);
try {
return true;
} finally {
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
"The reference count of the event {} cannot be increased, skipping it.",
progressReportEvent);
"Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}",
pipeName,
dataRegionId,
resource.getTsFilePath(),
e);
}
return isReferenceCountIncreased ? progressReportEvent : null;
}
}

protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) {
final ProgressReportEvent progressReportEvent =
new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
progressReportEvent.bindProgressIndex(progressIndex);
final boolean isReferenceCountIncreased =
progressReportEvent.increaseReferenceCount(
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
if (!isReferenceCountIncreased) {
LOGGER.warn(
"The reference count of the event {} cannot be increased, skipping it.",
progressReportEvent);
}
return isReferenceCountIncreased ? progressReportEvent : null;
}

protected Event supplyTsFileEvent(final TsFileResource resource) {
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
return supplyProgressReportEvent(resource.getMaxProgressIndex());
}

final PipeTsFileInsertionEvent event =
Expand Down Expand Up @@ -916,6 +964,13 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
}
}

private boolean shouldSkipHistoricalTsFileEvent(final TsFileResource resource) {
return pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
&& DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
&& PipeTsFileEpochProgressIndexKeeper.getInstance()
.containsTsFile(dataRegionId, tsFileDedupScopeID, resource.getTsFilePath());
}

private Event supplyDeletionEvent(final DeletionResource deletionResource) {
final PipeDeleteDataNodeEvent event =
new PipeDeleteDataNodeEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) {
if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
.registerProgressIndex(
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource());
} else {
event
.getTsFileEpoch()
Expand Down Expand Up @@ -156,7 +157,8 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
case USING_TABLET:
// If the state is USING_TABLET, discard the event
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
.eliminateProgressIndex(
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath());
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false);
return;
case EMPTY:
Expand Down Expand Up @@ -283,7 +285,8 @@ private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
.eliminateProgressIndex(
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath());
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {

protected String pipeID;
private String taskID;
private String tsFileDedupScopeID;
protected long userId;
protected String userName;
protected String cliHostname;
Expand Down Expand Up @@ -226,6 +227,7 @@ public void customize(
creationTime = environment.getCreationTime();
pipeID = pipeName + "_" + creationTime;
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
tsFileDedupScopeID = taskID + "_" + Integer.toHexString(System.identityHashCode(environment));

treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);
Expand Down Expand Up @@ -322,6 +324,8 @@ public void close() throws Exception {
if (dataRegionId >= 0) {
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
PipeTsFileEpochProgressIndexKeeper.getInstance()
.clearProgressIndex(dataRegionId, tsFileDedupScopeID);
}

synchronized (isClosed) {
Expand Down Expand Up @@ -576,7 +580,7 @@ private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent event)
if (PipeTsFileEpochProgressIndexKeeper.getInstance()
.isProgressIndexAfterOrEquals(
dataRegionId,
pipeName,
tsFileDedupScopeID,
event.getTsFileEpoch().getFilePath(),
getProgressIndex4RealtimeEvent(event))) {
event.skipReportOnCommit();
Expand Down Expand Up @@ -648,6 +652,10 @@ public String getTaskID() {
return taskID;
}

public final String getTsFileDedupScopeID() {
return tsFileDedupScopeID;
}

public void increaseExtractEpochSize() {
extractEpochSize.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ protected void doExtract(PipeRealtimeEvent event) {

event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
.registerProgressIndex(
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource());

if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false);
Expand Down Expand Up @@ -104,7 +105,9 @@ public Event supply() {
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(
dataRegionId, pipeName, realtimeEvent.getTsFileEpoch().getFilePath());
dataRegionId,
getTsFileDedupScopeID(),
realtimeEvent.getTsFileEpoch().getFilePath());
}

realtimeEvent.decreaseReferenceCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void assignToSource(
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) innerEvent;
tsFileInsertionEvent.bindTsFileDedupScopeID(source.getTsFileDedupScopeID());
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
}

Expand Down
Loading
Loading