diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java new file mode 100644 index 0000000000000..66376c6130168 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.local; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSubscriptionMessageIT extends AbstractSubscriptionLocalIT { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Ignore + @Test + public void testPullConsumerCommitAfterRemoveUserData() throws Exception { + final String topicName = "topic_remove_user_data"; + insertHistoricalData(0, 100); + createTopic(topicName); + + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c_remove_user_data") + .consumerGroupId("cg_remove_user_data") + .autoCommit(false) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + final List messages = pollMessages(consumer); + Assert.assertFalse(messages.isEmpty()); + + for (final SubscriptionMessage message : messages) { + Assert.assertNotNull(message.getCommitContext()); + Assert.assertFalse(message.getResultSets().isEmpty()); + + message.removeUserData(); + + Assert.assertNotNull(message.getCommitContext()); + Assert.assertThrows(SubscriptionRuntimeException.class, message::getResultSets); + Assert.assertThrows(SubscriptionRuntimeException.class, message::getRecordTabletIterator); + message.removeUserData(); + } + + consumer.commitSync(messages); + consumer.unsubscribe(topicName); + } + } + + @Ignore + @Test + public void testPullConsumerAutoCommitStoresCommitContextsOnly() throws Exception { + final String topicName = "topic_auto_commit_context_only"; + insertHistoricalData(100, 200); + createTopic(topicName); + + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c_auto_commit") + .consumerGroupId("cg_auto_commit") + .autoCommit(true) + .autoCommitIntervalMs(60_000L) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + final List messages = pollMessages(consumer); + Assert.assertFalse(messages.isEmpty()); + messages.forEach(SubscriptionMessage::removeUserData); + + final SortedMap> uncommittedCommitContexts = + getUncommittedCommitContexts(consumer); + Assert.assertFalse(uncommittedCommitContexts.isEmpty()); + + final Object storedObject = + uncommittedCommitContexts.values().iterator().next().iterator().next(); + Assert.assertTrue(storedObject instanceof SubscriptionCommitContext); + Assert.assertFalse(storedObject instanceof SubscriptionMessage); + + consumer.unsubscribe(topicName); + } + } + + private void insertHistoricalData(final int start, final int end) { + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + for (int i = start; i < end; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void createTopic(final String topicName) { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + session.createTopic(topicName); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private List pollMessages(final SubscriptionPullConsumer consumer) + throws Exception { + for (int i = 0; i < 10; ++i) { + final List messages = + consumer.poll(Duration.ofMillis(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS)); + if (!messages.isEmpty()) { + return messages; + } + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); + } + fail("Failed to poll subscription messages within the expected timeout."); + throw new IllegalStateException("unreachable"); + } + + @SuppressWarnings("unchecked") + private SortedMap> getUncommittedCommitContexts( + final SubscriptionPullConsumer consumer) throws Exception { + final Field field = + SubscriptionPullConsumer.class.getDeclaredField("uncommittedCommitContexts"); + field.setAccessible(true); + return (SortedMap>) field.get(consumer); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index df12472f78fa2..92c6b2c9a3e29 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -1120,22 +1120,40 @@ private List pollTabletsInternal( /////////////////////////////// commit sync (ack & nack) /////////////////////////////// protected void ack(final Iterable messages) throws SubscriptionException { + ackCommitContexts(extractCommitContexts(messages)); + } + + protected void ackCommitContexts(final Iterable commitContexts) + throws SubscriptionException { + commit(commitContexts, false); + } + + private Iterable extractCommitContexts( + final Iterable messages) { + final List commitContexts = new ArrayList<>(); + for (final SubscriptionMessage message : messages) { + commitContexts.add(message.getCommitContext()); + } + return commitContexts; + } + + private void commit(final Iterable commitContexts, final boolean nack) + throws SubscriptionException { final Map> dataNodeIdToSubscriptionCommitContexts = new HashMap<>(); - for (final SubscriptionMessage message : messages) { + for (final SubscriptionCommitContext commitContext : commitContexts) { dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(message.getCommitContext()); + .computeIfAbsent(commitContext.getDataNodeId(), (id) -> new ArrayList<>()) + .add(commitContext); } for (final Entry> entry : dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), false); + commitInternal(entry.getKey(), entry.getValue(), nack); } } protected void nack(final Iterable messages) throws SubscriptionException { - final Map> dataNodeIdToSubscriptionCommitContexts = - new HashMap<>(); + final List commitContexts = new ArrayList<>(); for (final SubscriptionMessage message : messages) { // make every effort to delete stale intermediate file if (Objects.equals(SubscriptionMessageType.TS_FILE.getType(), message.getMessageType()) @@ -1147,29 +1165,18 @@ protected void nack(final Iterable messages) throws Subscri } catch (final Exception ignored) { } } - dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(message.getCommitContext()); - } - for (final Entry> entry : - dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitContexts.add(message.getCommitContext()); } + commit(commitContexts, true); } private void nack(final List responses) throws SubscriptionException { - final Map> dataNodeIdToSubscriptionCommitContexts = - new HashMap<>(); + final List commitContexts = new ArrayList<>(); for (final SubscriptionPollResponse response : responses) { // there is no stale intermediate file here - dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(response.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(response.getCommitContext()); - } - for (final Entry> entry : - dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitContexts.add(response.getCommitContext()); } + commit(commitContexts, true); } private void commitInternal( diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index c1dd131490ee4..a77716fe02cdf 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.util.CollectionUtils; import org.apache.iotdb.session.subscription.util.IdentifierUtils; @@ -62,7 +63,7 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer { private final boolean autoCommit; private final long autoCommitIntervalMs; - private SortedMap> uncommittedMessages; + private SortedMap> uncommittedCommitContexts; private final AtomicBoolean isClosed = new AtomicBoolean(true); @@ -117,7 +118,7 @@ public synchronized void open() throws SubscriptionException { // submit auto poll worker if enabling auto commit if (autoCommit) { - uncommittedMessages = new ConcurrentSkipListMap<>(); + uncommittedCommitContexts = new ConcurrentSkipListMap<>(); submitAutoCommitWorker(); } } @@ -195,9 +196,12 @@ public List poll(final Set topicNames, final long t if (currentTimestamp % autoCommitIntervalMs == 0) { index -= 1; } - uncommittedMessages + uncommittedCommitContexts .computeIfAbsent(index, o -> new ConcurrentSkipListSet<>()) - .addAll(messages); + .addAll( + messages.stream() + .map(SubscriptionMessage::getCommitContext) + .collect(Collectors.toList())); } return messages; @@ -264,11 +268,11 @@ public void run() { index -= 1; } - for (final Map.Entry> entry : - uncommittedMessages.headMap(index).entrySet()) { + for (final Map.Entry> entry : + uncommittedCommitContexts.headMap(index).entrySet()) { try { - ack(entry.getValue()); - uncommittedMessages.remove(entry.getKey()); + ackCommitContexts(entry.getValue()); + uncommittedCommitContexts.remove(entry.getKey()); } catch (final Exception e) { LOGGER.warn("something unexpected happened when auto commit messages...", e); } @@ -277,10 +281,11 @@ public void run() { } private void commitAllUncommittedMessages() { - for (final Map.Entry> entry : uncommittedMessages.entrySet()) { + for (final Map.Entry> entry : + uncommittedCommitContexts.entrySet()) { try { - ack(entry.getValue()); - uncommittedMessages.remove(entry.getKey()); + ackCommitContexts(entry.getValue()); + uncommittedCommitContexts.remove(entry.getKey()); } catch (final Exception e) { LOGGER.warn("something unexpected happened when commit messages during close", e); } @@ -421,7 +426,7 @@ protected Map allReportMessage() { allReportMessage.put("autoCommit", String.valueOf(autoCommit)); allReportMessage.put("autoCommitIntervalMs", String.valueOf(autoCommitIntervalMs)); if (autoCommit) { - allReportMessage.put("uncommittedMessages", uncommittedMessages.toString()); + allReportMessage.put("uncommittedCommitContexts", uncommittedCommitContexts.toString()); } return allReportMessage; } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java index 736d9d0c1b718..81074b55b2826 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java @@ -20,6 +20,7 @@ package org.apache.iotdb.session.subscription.payload; import org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandlerException; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.tsfile.write.record.Tablet; @@ -36,6 +37,8 @@ public class SubscriptionMessage implements Comparable { private final SubscriptionMessageHandler handler; + private volatile boolean userDataRemoved = false; + public SubscriptionMessage( final SubscriptionCommitContext commitContext, final List tablets) { this.commitContext = commitContext; @@ -58,6 +61,17 @@ public short getMessageType() { return messageType; } + public void removeUserData() { + if (userDataRemoved) { + return; + } + + handler.removeUserData(); + if (handler instanceof SubscriptionRecordHandler) { + userDataRemoved = true; + } + } + /////////////////////////////// override /////////////////////////////// @Override @@ -96,6 +110,7 @@ public String toString() { /////////////////////////////// handlers /////////////////////////////// public List getResultSets() { + ensureUserDataAvailable(); if (handler instanceof SubscriptionRecordHandler) { return ((SubscriptionRecordHandler) handler).getResultSets(); } @@ -104,6 +119,7 @@ public List getResultSets() { } public Iterator getRecordTabletIterator() { + ensureUserDataAvailable(); if (handler instanceof SubscriptionRecordHandler) { return ((SubscriptionRecordHandler) handler) .getResultSets().stream() @@ -122,4 +138,11 @@ public SubscriptionTsFileHandler getTsFile() { throw new SubscriptionIncompatibleHandlerException( String.format("%s do not support getTsFile().", handler.getClass().getSimpleName())); } + + private void ensureUserDataAvailable() { + if (userDataRemoved) { + throw new SubscriptionRuntimeException( + String.format("User data has been removed from %s.", getClass().getSimpleName())); + } + } } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java index 275f89c0d14a6..c6d397322473a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java @@ -19,4 +19,7 @@ package org.apache.iotdb.session.subscription.payload; -public interface SubscriptionMessageHandler {} +public interface SubscriptionMessageHandler { + + default void removeUserData() {} +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java index c7b67c796ffea..d2ba5e98c331a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.session.subscription.payload; import org.apache.iotdb.isession.ISessionDataSet; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.Field; @@ -64,6 +65,11 @@ public Iterator iterator() { return resultSets.iterator(); } + @Override + public void removeUserData() { + resultSets.forEach(SubscriptionResultSet::removeUserData); + } + public static class SubscriptionResultSet implements ISessionDataSet { private Tablet tablet; @@ -76,17 +82,21 @@ public static class SubscriptionResultSet implements ISessionDataSet { private List columnTypeList; + private volatile boolean userDataRemoved = false; + private SubscriptionResultSet(final Tablet tablet) { this.tablet = tablet; this.sortedRowPositions = generateSortedRowPositions(tablet); } public Tablet getTablet() { + ensureUserDataAvailable(); return tablet; } @Override public List getColumnNames() { + ensureUserDataAvailable(); if (Objects.nonNull(columnNameList)) { return columnNameList; } @@ -105,6 +115,7 @@ public List getColumnNames() { @Override public List getColumnTypes() { + ensureUserDataAvailable(); if (Objects.nonNull(columnTypeList)) { return columnTypeList; } @@ -119,11 +130,13 @@ public List getColumnTypes() { } public boolean hasNext() { + ensureUserDataAvailable(); return Objects.nonNull(tablet) && rowIndex + 1 < sortedRowPositions.size(); } @Override public RowRecord next() { + ensureUserDataAvailable(); final RowPosition position = sortedRowPositions.get(++rowIndex); return generateRowRecord(position.timestamp, position.rowIndex); } @@ -133,6 +146,16 @@ public void close() { tablet = null; } + private void removeUserData() { + if (userDataRemoved) { + return; + } + + userDataRemoved = true; + sortedRowPositions.clear(); + close(); + } + private RowRecord generateRowRecord(final long timestamp, final int rowPosition) { final int columnSize = tablet.getSchemas().size(); final List fields = new ArrayList<>(columnSize); @@ -211,5 +234,12 @@ private RowPosition(final long timestamp, final int rowIndex) { this.rowIndex = rowIndex; } } + + private void ensureUserDataAvailable() { + if (userDataRemoved) { + throw new SubscriptionRuntimeException( + String.format("User data has been removed from %s.", getClass().getSimpleName())); + } + } } }