Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it.local;

import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.locks.LockSupport;

import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
public class IoTDBSubscriptionMessageIT extends AbstractSubscriptionLocalIT {

@Override
@Before
public void setUp() throws Exception {
super.setUp();
}

@Ignore
@Test
public void testPullConsumerCommitAfterRemoveUserData() throws Exception {
final String topicName = "topic_remove_user_data";
insertHistoricalData(0, 100);
createTopic(topicName);

final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c_remove_user_data")
.consumerGroupId("cg_remove_user_data")
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);

final List<SubscriptionMessage> messages = pollMessages(consumer);
Assert.assertFalse(messages.isEmpty());

for (final SubscriptionMessage message : messages) {
Assert.assertNotNull(message.getCommitContext());
Assert.assertFalse(message.getResultSets().isEmpty());

message.removeUserData();

Assert.assertNotNull(message.getCommitContext());
Assert.assertThrows(SubscriptionRuntimeException.class, message::getResultSets);
Assert.assertThrows(SubscriptionRuntimeException.class, message::getRecordTabletIterator);
message.removeUserData();
}

consumer.commitSync(messages);
consumer.unsubscribe(topicName);
}
}

@Ignore
@Test
public void testPullConsumerAutoCommitStoresCommitContextsOnly() throws Exception {
final String topicName = "topic_auto_commit_context_only";
insertHistoricalData(100, 200);
createTopic(topicName);

final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c_auto_commit")
.consumerGroupId("cg_auto_commit")
.autoCommit(true)
.autoCommitIntervalMs(60_000L)
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);

final List<SubscriptionMessage> messages = pollMessages(consumer);
Assert.assertFalse(messages.isEmpty());
messages.forEach(SubscriptionMessage::removeUserData);

final SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts =
getUncommittedCommitContexts(consumer);
Assert.assertFalse(uncommittedCommitContexts.isEmpty());

final Object storedObject =
uncommittedCommitContexts.values().iterator().next().iterator().next();
Assert.assertTrue(storedObject instanceof SubscriptionCommitContext);
Assert.assertFalse(storedObject instanceof SubscriptionMessage);

consumer.unsubscribe(topicName);
}
}

private void insertHistoricalData(final int start, final int end) {
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
for (int i = start; i < end; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private void createTopic(final String topicName) {
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private List<SubscriptionMessage> pollMessages(final SubscriptionPullConsumer consumer)
throws Exception {
for (int i = 0; i < 10; ++i) {
final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS));
if (!messages.isEmpty()) {
return messages;
}
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
}
fail("Failed to poll subscription messages within the expected timeout.");
throw new IllegalStateException("unreachable");
}

@SuppressWarnings("unchecked")
private SortedMap<Long, Set<SubscriptionCommitContext>> getUncommittedCommitContexts(
final SubscriptionPullConsumer consumer) throws Exception {
final Field field =
SubscriptionPullConsumer.class.getDeclaredField("uncommittedCommitContexts");
field.setAccessible(true);
return (SortedMap<Long, Set<SubscriptionCommitContext>>) field.get(consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1120,22 +1120,40 @@ private List<SubscriptionPollResponse> pollTabletsInternal(
/////////////////////////////// commit sync (ack & nack) ///////////////////////////////

protected void ack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
ackCommitContexts(extractCommitContexts(messages));
}

protected void ackCommitContexts(final Iterable<SubscriptionCommitContext> commitContexts)
throws SubscriptionException {
commit(commitContexts, false);
}

private Iterable<SubscriptionCommitContext> extractCommitContexts(
final Iterable<SubscriptionMessage> messages) {
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
for (final SubscriptionMessage message : messages) {
commitContexts.add(message.getCommitContext());
}
return commitContexts;
}

private void commit(final Iterable<SubscriptionCommitContext> commitContexts, final boolean nack)
throws SubscriptionException {
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
new HashMap<>();
for (final SubscriptionMessage message : messages) {
for (final SubscriptionCommitContext commitContext : commitContexts) {
dataNodeIdToSubscriptionCommitContexts
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
.add(message.getCommitContext());
.computeIfAbsent(commitContext.getDataNodeId(), (id) -> new ArrayList<>())
.add(commitContext);
}
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
commitInternal(entry.getKey(), entry.getValue(), false);
commitInternal(entry.getKey(), entry.getValue(), nack);
}
}

protected void nack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
new HashMap<>();
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
for (final SubscriptionMessage message : messages) {
// make every effort to delete stale intermediate file
if (Objects.equals(SubscriptionMessageType.TS_FILE.getType(), message.getMessageType())
Expand All @@ -1147,29 +1165,18 @@ protected void nack(final Iterable<SubscriptionMessage> messages) throws Subscri
} catch (final Exception ignored) {
}
}
dataNodeIdToSubscriptionCommitContexts
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
.add(message.getCommitContext());
}
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
commitInternal(entry.getKey(), entry.getValue(), true);
commitContexts.add(message.getCommitContext());
}
commit(commitContexts, true);
}

private void nack(final List<SubscriptionPollResponse> responses) throws SubscriptionException {
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
new HashMap<>();
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
for (final SubscriptionPollResponse response : responses) {
// there is no stale intermediate file here
dataNodeIdToSubscriptionCommitContexts
.computeIfAbsent(response.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
.add(response.getCommitContext());
}
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
commitInternal(entry.getKey(), entry.getValue(), true);
commitContexts.add(response.getCommitContext());
}
commit(commitContexts, true);
}

private void commitInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer {
private final boolean autoCommit;
private final long autoCommitIntervalMs;

private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
private SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts;

private final AtomicBoolean isClosed = new AtomicBoolean(true);

Expand Down Expand Up @@ -117,7 +118,7 @@ public synchronized void open() throws SubscriptionException {

// submit auto poll worker if enabling auto commit
if (autoCommit) {
uncommittedMessages = new ConcurrentSkipListMap<>();
uncommittedCommitContexts = new ConcurrentSkipListMap<>();
submitAutoCommitWorker();
}
}
Expand Down Expand Up @@ -195,9 +196,12 @@ public List<SubscriptionMessage> poll(final Set<String> topicNames, final long t
if (currentTimestamp % autoCommitIntervalMs == 0) {
index -= 1;
}
uncommittedMessages
uncommittedCommitContexts
.computeIfAbsent(index, o -> new ConcurrentSkipListSet<>())
.addAll(messages);
.addAll(
messages.stream()
.map(SubscriptionMessage::getCommitContext)
.collect(Collectors.toList()));
}

return messages;
Expand Down Expand Up @@ -264,11 +268,11 @@ public void run() {
index -= 1;
}

for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
uncommittedMessages.headMap(index).entrySet()) {
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
uncommittedCommitContexts.headMap(index).entrySet()) {
try {
ack(entry.getValue());
uncommittedMessages.remove(entry.getKey());
ackCommitContexts(entry.getValue());
uncommittedCommitContexts.remove(entry.getKey());
} catch (final Exception e) {
LOGGER.warn("something unexpected happened when auto commit messages...", e);
}
Expand All @@ -277,10 +281,11 @@ public void run() {
}

private void commitAllUncommittedMessages() {
for (final Map.Entry<Long, Set<SubscriptionMessage>> entry : uncommittedMessages.entrySet()) {
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
uncommittedCommitContexts.entrySet()) {
try {
ack(entry.getValue());
uncommittedMessages.remove(entry.getKey());
ackCommitContexts(entry.getValue());
uncommittedCommitContexts.remove(entry.getKey());
} catch (final Exception e) {
LOGGER.warn("something unexpected happened when commit messages during close", e);
}
Expand Down Expand Up @@ -421,7 +426,7 @@ protected Map<String, String> allReportMessage() {
allReportMessage.put("autoCommit", String.valueOf(autoCommit));
allReportMessage.put("autoCommitIntervalMs", String.valueOf(autoCommitIntervalMs));
if (autoCommit) {
allReportMessage.put("uncommittedMessages", uncommittedMessages.toString());
allReportMessage.put("uncommittedCommitContexts", uncommittedCommitContexts.toString());
}
return allReportMessage;
}
Expand Down
Loading
Loading