From c113e8f545fd6107c77b1b0dde862c485f5e1550 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 14 Apr 2026 20:53:31 +0800 Subject: [PATCH 1/2] feat(subscription): support encrypted password auth --- .../IoTDBEncryptedPasswordPullConsumerIT.java | 174 ++++++++++++++++++ .../subscription/config/ConsumerConfig.java | 4 + .../subscription/config/ConsumerConstant.java | 1 + .../base/AbstractSubscriptionConsumer.java | 5 + .../AbstractSubscriptionConsumerBuilder.java | 6 + .../base/AbstractSubscriptionProvider.java | 6 + ...stractSubscriptionPullConsumerBuilder.java | 6 + ...stractSubscriptionPushConsumerBuilder.java | 6 + .../table/SubscriptionTableProvider.java | 2 + .../table/SubscriptionTablePullConsumer.java | 2 + .../SubscriptionTablePullConsumerBuilder.java | 6 + .../table/SubscriptionTablePushConsumer.java | 2 + .../SubscriptionTablePushConsumerBuilder.java | 6 + .../tree/SubscriptionTreeProvider.java | 2 + .../tree/SubscriptionTreePullConsumer.java | 9 + .../SubscriptionTreePullConsumerBuilder.java | 6 + .../tree/SubscriptionTreePushConsumer.java | 9 + .../SubscriptionTreePushConsumerBuilder.java | 6 + .../impl/pipe/task/CreatePipeProcedureV2.java | 37 ++-- .../CreateSubscriptionProcedure.java | 3 +- .../pipe/task/CreatePipeProcedureV2Test.java | 59 ++++++ .../meta/consumer/ConsumerMeta.java | 8 + .../subscription/meta/topic/TopicMeta.java | 8 + .../subscription/topic/TopicDeSerTest.java | 11 ++ 24 files changed, 371 insertions(+), 13 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java new file mode 100644 index 0000000000000..0b9207cf6c587 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.triple.treemodel.regression.param; + +import org.apache.iotdb.commons.utils.AuthUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTreeRegressionMisc; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; + +import org.apache.thrift.TException; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Ignore("TODO: enable after encrypted password subscription IT stabilizes") +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2SubscriptionTreeRegressionMisc.class}) +public class IoTDBEncryptedPasswordPullConsumerIT extends AbstractSubscriptionTreeRegressionIT { + + private static final String DATABASE = "root.TestEncryptedPasswordPullConsumer"; + private static final String DEVICE = DATABASE + ".d_0"; + private static final String TOPIC_NAME = "TestEncryptedPasswordPullConsumerTopic"; + private static final String USERNAME = "encrypted_user"; + private static final String PASSWORD = "encrypted_user_123"; + private static final String ENCRYPTED_PASSWORD = AuthUtils.encryptPassword(PASSWORD); + private static final String WRONG_ENCRYPTED_PASSWORD = + AuthUtils.encryptPassword("wrong_encrypted_user_123"); + + private static final List SCHEMA_LIST = new ArrayList<>(); + + static { + SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64)); + SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE)); + } + + private SubscriptionTreePullConsumer consumer; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + createDB(DATABASE); + createTopic_s(TOPIC_NAME, "root.**", null, null, false); + session_src.createTimeseries( + DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4); + session_src.createTimeseries( + DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4); + session_dest.createTimeseries( + DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4); + session_dest.createTimeseries( + DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4); + session_src.executeNonQueryStatement("create user " + USERNAME + " '" + PASSWORD + "'"); + session_src.executeNonQueryStatement("grant read,write on root.** to user " + USERNAME); + assertTrue(subs.getTopic(TOPIC_NAME).isPresent()); + } + + @Override + @After + public void tearDown() throws Exception { + try { + if (consumer != null) { + consumer.close(); + } + } catch (final Exception ignored) { + } + try { + subs.dropTopic(TOPIC_NAME); + } catch (final Exception ignored) { + } + try { + session_src.executeNonQueryStatement("drop user " + USERNAME); + } catch (final Exception ignored) { + } + dropDB(DATABASE); + super.tearDown(); + } + + @Test + public void testSubscribeWithEncryptedPassword() + throws TException, + IoTDBConnectionException, + IOException, + StatementExecutionException, + InterruptedException { + consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD); + + consumer.open(); + consumer.subscribe(TOPIC_NAME); + assertEquals(1, subs.getSubscriptions().size(), "subscribe with encrypted password"); + + insertData(1706659200000L); + consume_data(consumer, session_dest); + check_count( + 4, + "select count(s_0) from " + DEVICE + " where time >= 1706659200000", + "encrypted password consumption"); + } + + @Test + public void testSubscribeFailsWithWrongEncryptedPassword() + throws IoTDBConnectionException, StatementExecutionException { + consumer = createConsumer("wrong-encrypted-password-group", WRONG_ENCRYPTED_PASSWORD); + + try { + consumer.open(); + consumer.subscribe(TOPIC_NAME); + fail("subscribe should fail when encrypted password mismatches"); + } catch (final Exception ignored) { + assertTrue(subs.getSubscriptions().isEmpty()); + } + } + + private SubscriptionTreePullConsumer createConsumer( + final String consumerGroupId, final String encryptedPassword) { + return new SubscriptionTreePullConsumer.Builder() + .host(SRC_HOST) + .port(SRC_PORT) + .username(USERNAME) + .password(PASSWORD) + .encryptedPassword(encryptedPassword) + .consumerId("consumer_" + consumerGroupId) + .consumerGroupId(consumerGroupId) + .buildPullConsumer(); + } + + private void insertData(long timestamp) + throws IoTDBConnectionException, StatementExecutionException { + final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10); + for (int row = 0; row < 5; row++) { + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s_0", rowIndex, row * 20L + row); + tablet.addValue("s_1", rowIndex, row + 2.45); + timestamp += row * 2000; + } + session_src.insertTablet(tablet); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 0b5c5a5547794..3cb0087d6827e 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -76,6 +76,10 @@ public String getPassword() { return getString(ConsumerConstant.PASSWORD_KEY); } + public String getEncryptedPassword() { + return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY); + } + public String getSqlDialect() { return getString(ConsumerConstant.SQL_DIALECT_KEY); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index a0e6f9ed22801..9c52c8dd7da90 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -34,6 +34,7 @@ public class ConsumerConstant { public static final String USERNAME_KEY = "username"; public static final String PASSWORD_KEY = "password"; + public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password"; public static final String CONSUMER_ID_KEY = "consumer-id"; public static final String CONSUMER_GROUP_ID_KEY = "group-id"; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index e9fbb1672e563..62c8d20e3894d 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -100,6 +100,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { private final String username; private final String password; + private final String encryptedPassword; protected String consumerId; protected String consumerGroupId; @@ -177,6 +178,7 @@ protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder this.username = builder.username; this.password = builder.password; + this.encryptedPassword = builder.encryptedPassword; this.consumerId = builder.consumerId; this.consumerGroupId = builder.consumerGroupId; @@ -207,6 +209,7 @@ protected AbstractSubscriptionConsumer( (String) properties.getOrDefault( ConsumerConstant.PASSWORD_KEY, SessionConfig.DEFAULT_PASSWORD)) + .encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY)) .consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)) .consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)) .heartbeatIntervalMs( @@ -387,6 +390,7 @@ protected abstract AbstractSubscriptionProvider constructSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -400,6 +404,7 @@ AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPo endPoint, this.username, this.password, + this.encryptedPassword, this.consumerId, this.consumerGroupId, this.thriftMaxFrameSize, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 81bfb6241c902..ecb5e0ffc36ba 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -36,6 +36,7 @@ public class AbstractSubscriptionConsumerBuilder { protected String username = SessionConfig.DEFAULT_USER; protected String password = SessionConfig.DEFAULT_PASSWORD; + protected String encryptedPassword; protected String consumerId; protected String consumerGroupId; @@ -76,6 +77,11 @@ public AbstractSubscriptionConsumerBuilder password(final String password) { return this; } + public AbstractSubscriptionConsumerBuilder encryptedPassword(final String encryptedPassword) { + this.encryptedPassword = encryptedPassword; + return this; + } + public AbstractSubscriptionConsumerBuilder consumerId(@Nullable final String consumerId) { if (Objects.isNull(consumerId)) { return this; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 7f3582d195d6a..413c609abbff3 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -90,6 +90,7 @@ public abstract class AbstractSubscriptionProvider { private final String username; private final String password; + private final String encryptedPassword; private final long heartbeatIntervalMs; private final int connectionTimeoutInMs; @@ -105,6 +106,7 @@ protected AbstractSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -124,6 +126,7 @@ protected AbstractSubscriptionProvider( this.consumerGroupId = consumerGroupId; this.username = username; this.password = password; + this.encryptedPassword = encryptedPassword; this.heartbeatIntervalMs = heartbeatIntervalMs; this.connectionTimeoutInMs = connectionTimeoutInMs; } @@ -175,6 +178,9 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); + if (encryptedPassword != null) { + consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY, encryptedPassword); + } consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, session.getSqlDialect()); consumerAttributes.put( ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java index 7083a7dc4af60..2fac3d500eb10 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java @@ -58,6 +58,12 @@ public AbstractSubscriptionPullConsumerBuilder password(final String password) { return this; } + @Override + public AbstractSubscriptionPullConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public AbstractSubscriptionPullConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java index f013b98dd1951..bcd33812c8e09 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java @@ -64,6 +64,12 @@ public AbstractSubscriptionPushConsumerBuilder password(final String password) { return this; } + @Override + public AbstractSubscriptionPushConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public AbstractSubscriptionPushConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 1b90866db9e37..84470d283c21b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -30,6 +30,7 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -39,6 +40,7 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 83dd39aebbf7d..8f712782fb5f0 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -42,6 +42,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -51,6 +52,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index 6d8437ac95f03..939228a7f49e9 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -56,6 +56,12 @@ public SubscriptionTablePullConsumerBuilder password(final String password) { return this; } + @Override + public SubscriptionTablePullConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index ac44e421dac57..e90afc1d8d175 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -38,6 +38,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -47,6 +48,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index c372c586db394..27bf328fea9e6 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -58,6 +58,12 @@ public SubscriptionTablePushConsumerBuilder password(final String password) { return this; } + @Override + public SubscriptionTablePushConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index c79b64e8c846f..3589fbbcf749a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -30,6 +30,7 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -39,6 +40,7 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 23050893f660d..7225036aaa4cf 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -49,6 +49,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -58,6 +59,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, @@ -80,6 +82,7 @@ private SubscriptionTreePullConsumer(final SubscriptionTreePullConsumer.Builder .nodeUrls(builder.nodeUrls) .username(builder.username) .password(builder.password) + .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) .heartbeatIntervalMs(builder.heartbeatIntervalMs) @@ -231,6 +234,7 @@ public static class Builder { private String username = SessionConfig.DEFAULT_USER; private String password = SessionConfig.DEFAULT_PASSWORD; + private String encryptedPassword; private String consumerId; private String consumerGroupId; @@ -274,6 +278,11 @@ public Builder password(final String password) { return this; } + public Builder encryptedPassword(final String encryptedPassword) { + this.encryptedPassword = encryptedPassword; + return this; + } + public Builder consumerId(@Nullable final String consumerId) { if (Objects.isNull(consumerId)) { return this; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index 8623a49208776..cbceb95d77f90 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -56,6 +56,12 @@ public SubscriptionTreePullConsumerBuilder password(final String password) { return this; } + @Override + public SubscriptionTreePullConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index d56e89d47c86d..4d8a5ef3e169f 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -48,6 +48,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -57,6 +58,7 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( endPoint, username, password, + encryptedPassword, consumerId, consumerGroupId, thriftMaxFrameSize, @@ -79,6 +81,7 @@ private SubscriptionTreePushConsumer(final Builder builder) { .nodeUrls(builder.nodeUrls) .username(builder.username) .password(builder.password) + .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) .heartbeatIntervalMs(builder.heartbeatIntervalMs) @@ -185,6 +188,7 @@ public static class Builder { private String username = SessionConfig.DEFAULT_USER; private String password = SessionConfig.DEFAULT_PASSWORD; + private String encryptedPassword; private String consumerId; private String consumerGroupId; @@ -231,6 +235,11 @@ public Builder password(final String password) { return this; } + public Builder encryptedPassword(final String encryptedPassword) { + this.encryptedPassword = encryptedPassword; + return this; + } + public Builder consumerId(@Nullable final String consumerId) { if (Objects.isNull(consumerId)) { return this; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index dd0cb01763770..86594433e77e0 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -58,6 +58,12 @@ public SubscriptionTreePushConsumerBuilder password(final String password) { return this; } + @Override + public SubscriptionTreePushConsumerBuilder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 867d20e078a19..98f6756db2d46 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -46,6 +46,7 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.util.PipeExternalSourceLoadBalancer; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -178,18 +179,30 @@ public static void checkAndEnrichSourceAuthentication( || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY) || sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY) || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) { - final String hashedPassword = - env.getConfigManager() - .getPermissionManager() - .login4Pipe( - sourceParameters.getStringByKeys( - PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, - PipeSourceConstant.SOURCE_IOTDB_USER_KEY, - PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, - PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY), - sourceParameters.getStringByKeys( - PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, - PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + final String username = + sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + final String password = + sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + String hashedPassword = null; + if (Objects.nonNull(password)) { + final TPermissionInfoResp loginResp = + env.getConfigManager().getPermissionManager().login(username, password, true); + if (Objects.nonNull(loginResp) + && Objects.nonNull(loginResp.getStatus()) + && loginResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + hashedPassword = password; + } + } + if (Objects.isNull(hashedPassword)) { + hashedPassword = + env.getConfigManager().getPermissionManager().login4Pipe(username, password); + } if (Objects.isNull(hashedPassword)) { throw new PipeException("Authentication failed."); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java index cb5edd8cd91a3..867f04d02ea88 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java @@ -117,7 +117,8 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) new TCreatePipeReq() .setPipeName(pipeName) .setExtractorAttributes( - topicMeta.generateExtractorAttributes(consumerMeta.getUsername())) + topicMeta.generateExtractorAttributes( + consumerMeta.getUsername(), consumerMeta.getSubscriptionAuthPassword())) .setProcessorAttributes(topicMeta.generateProcessorAttributes()) .setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)), pipeTaskInfo)); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java index 6c095de98529a..4548471743104 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java @@ -19,11 +19,19 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.PermissionManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; import java.nio.ByteBuffer; @@ -68,4 +76,55 @@ public void serializeDeserializeTest() { fail(); } } + + @Test + public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final PermissionManager permissionManager = Mockito.mock(PermissionManager.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager); + + final TPermissionInfoResp loginResp = new TPermissionInfoResp(); + loginResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + Mockito.when(permissionManager.login("user", "encrypted-password", true)).thenReturn(loginResp); + + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put("extractor", "iotdb-source"); + sourceAttributes.put("username", "user"); + sourceAttributes.put("password", "encrypted-password"); + + CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); + + assertEquals("encrypted-password", sourceAttributes.get("password")); + Mockito.verify(permissionManager).login("user", "encrypted-password", true); + Mockito.verify(permissionManager, Mockito.never()) + .login4Pipe(Mockito.anyString(), Mockito.any()); + } + + @Test + public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final PermissionManager permissionManager = Mockito.mock(PermissionManager.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager); + + final TPermissionInfoResp loginResp = new TPermissionInfoResp(); + loginResp.setStatus(new TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode())); + Mockito.when(permissionManager.login("user", "raw-password", true)).thenReturn(loginResp); + Mockito.when(permissionManager.login4Pipe("user", "raw-password")) + .thenReturn("hashed-password"); + + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source"); + sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user"); + sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "raw-password"); + + CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); + + assertEquals("hashed-password", sourceAttributes.get("password")); + Mockito.verify(permissionManager).login("user", "raw-password", true); + Mockito.verify(permissionManager).login4Pipe("user", "raw-password"); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java index 152f0b111df69..75cca9ccbcdd9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java @@ -66,6 +66,14 @@ public String getPassword() { return config.getPassword(); } + public String getEncryptedPassword() { + return config.getEncryptedPassword(); + } + + public String getSubscriptionAuthPassword() { + return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() : getPassword(); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index d3836743578a0..badb77d6f486d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -183,6 +183,11 @@ public static TopicMeta deserialize(final ByteBuffer byteBuffer) { /////////////////////////////// utilities /////////////////////////////// public Map generateExtractorAttributes(final String username) { + return generateExtractorAttributes(username, null); + } + + public Map generateExtractorAttributes( + final String username, final String password) { final Map extractorAttributes = new HashMap<>(); // disable meta sync extractorAttributes.put("source", "iotdb-source"); @@ -190,6 +195,9 @@ public Map generateExtractorAttributes(final String username) { extractorAttributes.put("inclusion.exclusion", "data.delete"); // user extractorAttributes.put("username", username); + if (Objects.nonNull(password)) { + extractorAttributes.put("password", password); + } // TODO: currently set skipif to no-privileges extractorAttributes.put("skipif", "no-privileges"); // sql dialect diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index 4973f06d27cf2..d9c280e14938c 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -54,4 +54,15 @@ public void test() throws IOException { Assert.assertEquals( topicMeta.getSubscribedConsumerGroupIds(), topicMeta2.getSubscribedConsumerGroupIds()); } + + @Test + public void testGenerateExtractorAttributesWithEncryptedPassword() { + final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new HashMap<>()); + + final Map extractorAttributes = + topicMeta.generateExtractorAttributes("test_user", "encrypted-password"); + + Assert.assertEquals("test_user", extractorAttributes.get("username")); + Assert.assertEquals("encrypted-password", extractorAttributes.get("password")); + } } From 95eb370cdaa81c6f59f4ab24dd8628b065be812d Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 16 Apr 2026 11:25:09 +0800 Subject: [PATCH 2/2] test(confignode): assert canonical source password key --- .../procedure/impl/pipe/task/CreatePipeProcedureV2Test.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java index 4548471743104..ac6ca8c740c36 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java @@ -96,7 +96,8 @@ public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() { CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); - assertEquals("encrypted-password", sourceAttributes.get("password")); + assertEquals( + "encrypted-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); Mockito.verify(permissionManager).login("user", "encrypted-password", true); Mockito.verify(permissionManager, Mockito.never()) .login4Pipe(Mockito.anyString(), Mockito.any()); @@ -123,7 +124,8 @@ public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() { CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); - assertEquals("hashed-password", sourceAttributes.get("password")); + assertEquals( + "hashed-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); Mockito.verify(permissionManager).login("user", "raw-password", true); Mockito.verify(permissionManager).login4Pipe("user", "raw-password"); }