From 7a4ff015c3b512e7f86d7c69954fe7a670ed5a5b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 15 Apr 2026 17:14:26 +0800 Subject: [PATCH 1/4] [To dev/1.3] Enhance the last query permission && Fixed the rollback version of alter view / table plans && Deleted the unnecessary mods in Tree view deletion (#17465) --- .../iotdb/db/it/IoTDBRestServiceIT.java | 160 +++++++++++ .../apache/iotdb/db/it/utils/TestUtils.java | 26 ++ .../iotdb/session/it/IoTDBSessionQueryIT.java | 77 +++++ .../table/RenameTableColumnProcedure.java | 247 ++++++++++++++++ .../table/SetTablePropertiesProcedure.java | 264 ++++++++++++++++++ .../rest/v2/impl/RestApiServiceImpl.java | 24 +- .../thrift/impl/ClientRPCServiceImpl.java | 72 +++-- .../plan/parser/StatementGenerator.java | 2 +- .../schemaregion/ISchemaRegion.java | 4 +- .../impl/SchemaRegionMemoryImpl.java | 6 +- .../impl/SchemaRegionPBTreeImpl.java | 4 +- .../impl/mem/MTreeBelowSGMemoryImpl.java | 7 +- .../iotdb/commons/path/PathPatternTree.java | 49 ++-- 13 files changed, 878 insertions(+), 64 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index ad1b93ad2d85b..61513272c29ab 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.it; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; @@ -57,6 +58,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -513,6 +515,7 @@ public void insertAndQuery() { selectLast(httpClient); queryV2(httpClient); + selectFastLast(httpClient); queryGroupByLevelV2(httpClient); queryRowLimitV2(httpClient); queryShowChildPathsV2(httpClient); @@ -923,6 +926,71 @@ public void queryWithWrongAuthorization() { } } + @Test + public void queryFastLastWithWrongAuthorization() { + CloseableHttpResponse response = null; + + TestUtils.executeNonQuery("create user abcd 'strongPassword@1234'"); + try { + final CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + final HttpPost httpPost = new HttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery"); + httpPost.addHeader("Content-type", "application/json; charset=utf-8"); + httpPost.setHeader("Accept", "application/json"); + final String authorization = getAuthorization("abcd", "strongPassword@1234"); + httpPost.setHeader("Authorization", authorization); + final String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}"; + httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + response = httpClient.execute(httpPost); + break; + } catch (Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + String message = EntityUtils.toString(response.getEntity(), "utf-8"); + ObjectMapper mapper = new ObjectMapper(); + Map map = mapper.readValue(message, Map.class); + List timestampsResult = (List) map.get("timestamps"); + List expressionsResult = (List) map.get("expressions"); + List> valuesResult = (List>) map.get("values"); + Assert.assertTrue(map.size() > 0); + List expressions = + new ArrayList() { + { + add("Timeseries"); + add("Value"); + add("DataType"); + } + }; + + Assert.assertEquals(expressions, expressionsResult); + Assert.assertEquals(Collections.emptyList(), timestampsResult); + Assert.assertEquals(Collections.emptyList(), valuesResult); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + public void query(CloseableHttpClient httpClient) { CloseableHttpResponse response = null; try { @@ -1677,6 +1745,98 @@ public void queryV2(CloseableHttpClient httpClient) { } } + public void selectFastLast(CloseableHttpClient httpClient) { + // Only used in 1D scenarios + if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) { + return; + } + CloseableHttpResponse response = null; + try { + HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery"); + String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}"; + httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset())); + response = httpClient.execute(httpPost); + HttpEntity responseEntity = response.getEntity(); + String message = EntityUtils.toString(responseEntity, "utf-8"); + ObjectMapper mapper = new ObjectMapper(); + Map map = mapper.readValue(message, Map.class); + List timestampsResult = (List) map.get("timestamps"); + List expressionsResult = (List) map.get("expressions"); + List> valuesResult = (List>) map.get("values"); + Assert.assertTrue(map.size() > 0); + List expressions = + new ArrayList() { + { + add("Timeseries"); + add("Value"); + add("DataType"); + } + }; + List timestamps = + new ArrayList() { + { + add(1635232153960l); + add(1635232153960l); + add(1635232153960l); + add(1635232143960l); + add(1635232153960l); + add(1635232153960l); + } + }; + List values1 = + new ArrayList() { + { + add("root.sg25.s3"); + add("root.sg25.s4"); + add("root.sg25.s5"); + add("root.sg25.s6"); + add("root.sg25.s7"); + add("root.sg25.s8"); + } + }; + List values2 = + new ArrayList() { + { + add(""); + add("2"); + add("1635000012345556"); + add("1.41"); + add("false"); + add("3.5555"); + } + }; + List values3 = + new ArrayList() { + { + add("TEXT"); + add("INT32"); + add("INT64"); + add("FLOAT"); + add("BOOLEAN"); + add("DOUBLE"); + } + }; + + Assert.assertEquals(expressions, expressionsResult); + Assert.assertEquals(timestamps, timestampsResult); + Assert.assertEquals(values1, valuesResult.get(0)); + Assert.assertEquals(values2, valuesResult.get(1)); + Assert.assertEquals(values3, valuesResult.get(2)); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + public void queryGroupByLevelV2(CloseableHttpClient httpClient) { CloseableHttpResponse response = null; try { diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index abf593670825f..f45209c788858 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -684,6 +684,32 @@ public static void assertResultSetEqual( } } + public static void assertResultSetEqual( + SessionDataSet actualResultSet, + List expectedColumnNames, + Set expectedRetSet, + boolean ignoreTimeStamp) { + final Set copiedSet = new HashSet<>(expectedRetSet); + try { + List actualColumnNames = actualResultSet.getColumnNames(); + if (ignoreTimeStamp) { + assertEquals(expectedColumnNames, actualColumnNames); + } else { + assertEquals(TIMESTAMP_STR, actualColumnNames.get(0)); + assertEquals(expectedColumnNames, actualColumnNames.subList(1, actualColumnNames.size())); + } + + while (actualResultSet.hasNext()) { + RowRecord rowRecord = actualResultSet.next(); + assertTrue(copiedSet.remove(rowRecord.toString().replace('\t', ','))); + } + assertEquals(0, copiedSet.size()); + } catch (IoTDBConnectionException | StatementExecutionException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + public static void createUser(String userName, String password) { createUser(EnvFactory.getEnv(), userName, password); } diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java index ba5c22ab9969e..8ba5fcc617093 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.it.utils.AlignedWriteUtil; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; @@ -29,6 +30,7 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.rpc.StatementExecutionException; import org.junit.AfterClass; @@ -244,6 +246,81 @@ public void lastQueryForOneDeviceNoSchema() throws IoTDBConnectionException { } } + @Test + public void lastQueryWithPrefixTest() throws IoTDBConnectionException { + // Only used in 1D scenarios + if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) { + return; + } + final Set retArray = + new HashSet<>( + Arrays.asList( + "30,root.sg1.d1.s3,30,INT64", + "30,root.sg1.d1.s4,false,BOOLEAN", + "40,root.sg1.d1.s5,aligned_test40,TEXT", + "23,root.sg1.d1.s1,230000.0,FLOAT", + "40,root.sg1.d1.s2,40,INT32")); + + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + // Push last cache first + try (final SessionDataSet resultSet = + session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) { + assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true); + } + + try (final SessionDataSet resultSet = + session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) { + assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true); + } + } catch (StatementExecutionException | RedirectException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void lastQueryWithoutPermissionTest() throws IoTDBConnectionException { + // Only used in 1D scenarios + if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) { + return; + } + final String[] retArray = new String[] {}; + final Set retArray2 = + new HashSet<>( + Arrays.asList( + "30,root.sg1.d1.s3,30,INT64", + "30,root.sg1.d1.s4,false,BOOLEAN", + "40,root.sg1.d1.s5,aligned_test40,TEXT", + "23,root.sg1.d1.s1,230000.0,FLOAT", + "40,root.sg1.d1.s2,40,INT32")); + TestUtils.executeNonQuery(EnvFactory.getEnv(), "create user abcd 'veryComplexPassword@123'"); + + try (final ISession session = + EnvFactory.getEnv().getSessionConnection("abcd", "veryComplexPassword@123"); + final ISession rootSession = EnvFactory.getEnv().getSessionConnection()) { + // Push last cache first + try (final SessionDataSet resultSet = + rootSession.executeFastLastDataQueryForOnePrefixPath( + Arrays.asList("root", "sg1", "d1"))) { + assertResultSetEqual(resultSet, lastQueryColumnNames, retArray2, true); + } + + try (final SessionDataSet resultSet = + session.executeLastDataQueryForOneDevice( + "root.sg1", "root.sg1.d1", Arrays.asList("notExist", "s1"), true)) { + assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true); + } + + try (final SessionDataSet resultSet = + session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) { + assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true); + } + } catch (StatementExecutionException | RedirectException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + // ------------------------------ Aggregation Query ------------------------------ @Test public void aggregationQueryTest() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java new file mode 100644 index 0000000000000..a2a6c72577cf9 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema.table; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.view.RenameViewColumnPlan; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.schema.table.view.RenameViewColumnProcedure; +import org.apache.iotdb.confignode.procedure.state.schema.RenameTableColumnState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class RenameTableColumnProcedure + extends AbstractAlterOrDropTableProcedure { + private static final Logger LOGGER = LoggerFactory.getLogger(RenameTableColumnProcedure.class); + + private String oldName; + private String newName; + + public RenameTableColumnProcedure(final boolean isGeneratedByPipe) { + super(isGeneratedByPipe); + } + + public RenameTableColumnProcedure( + final String database, + final String tableName, + final String queryId, + final String oldName, + final String newName, + final boolean isGeneratedByPipe) { + super(database, tableName, queryId, isGeneratedByPipe); + this.oldName = oldName; + this.newName = newName; + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final RenameTableColumnState state) + throws InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case COLUMN_CHECK: + LOGGER.info("Column check for table {}.{} when renaming column", database, tableName); + columnCheck(env); + break; + case PRE_RELEASE: + LOGGER.info("Pre release info of table {}.{} when renaming column", database, tableName); + preRelease(env); + break; + case RENAME_COLUMN: + LOGGER.info("Rename column to table {}.{} on config node", database, tableName); + renameColumn(env); + break; + case COMMIT_RELEASE: + LOGGER.info( + "Commit release info of table {}.{} when renaming column", database, tableName); + commitRelease(env); + return Flow.NO_MORE_STATE; + default: + setFailure(new ProcedureException("Unrecognized RenameTableColumnState " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "RenameTableColumn-{}.{}-{} costs {}ms", + database, + tableName, + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void columnCheck(final ConfigNodeProcedureEnv env) { + try { + final Pair result = + env.getConfigManager() + .getClusterSchemaManager() + .tableColumnCheckForColumnRenaming( + database, tableName, oldName, newName, this instanceof RenameViewColumnProcedure); + final TSStatus status = result.getLeft(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + return; + } + table = result.getRight(); + setNextState(RenameTableColumnState.PRE_RELEASE); + } catch (final MetadataException e) { + setFailure(new ProcedureException(e)); + } + } + + @Override + protected void preRelease(final ConfigNodeProcedureEnv env) { + super.preRelease(env); + setNextState(RenameTableColumnState.RENAME_COLUMN); + } + + private void renameColumn(final ConfigNodeProcedureEnv env) { + final TSStatus status = + env.getConfigManager() + .getClusterSchemaManager() + .executePlan( + this instanceof RenameViewColumnProcedure + ? new RenameViewColumnPlan(database, tableName, oldName, newName) + : new RenameTableColumnPlan(database, tableName, oldName, newName), + isGeneratedByPipe); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + } else { + setNextState(RenameTableColumnState.COMMIT_RELEASE); + } + } + + @Override + protected void rollbackState(final ConfigNodeProcedureEnv env, final RenameTableColumnState state) + throws IOException, InterruptedException, ProcedureException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case RENAME_COLUMN: + LOGGER.info( + "Start rollback Renaming column to table {}.{} on configNode", + database, + table.getTableName()); + rollbackRenameColumn(env); + break; + case PRE_RELEASE: + LOGGER.info( + "Start rollback pre release info of table {}.{}", database, table.getTableName()); + rollbackPreRelease(env); + break; + } + } finally { + LOGGER.info( + "Rollback RenameTableColumn-{} costs {}ms.", + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void rollbackRenameColumn(final ConfigNodeProcedureEnv env) { + if (table == null) { + return; + } + final TSStatus status = + env.getConfigManager() + .getClusterSchemaManager() + .executePlan( + this instanceof RenameViewColumnProcedure + ? new RenameViewColumnPlan(database, tableName, newName, oldName) + : new RenameTableColumnPlan(database, tableName, newName, oldName), + isGeneratedByPipe); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + } + } + + @Override + protected RenameTableColumnState getState(final int stateId) { + return RenameTableColumnState.values()[stateId]; + } + + @Override + protected int getStateId(final RenameTableColumnState state) { + return state.ordinal(); + } + + @Override + protected RenameTableColumnState getInitialState() { + return RenameTableColumnState.COLUMN_CHECK; + } + + @Override + protected String getActionMessage() { + return "rename table column"; + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + stream.writeShort( + isGeneratedByPipe + ? ProcedureType.PIPE_ENRICHED_RENAME_TABLE_COLUMN_PROCEDURE.getTypeCode() + : ProcedureType.RENAME_TABLE_COLUMN_PROCEDURE.getTypeCode()); + innerSerialize(stream); + } + + protected void innerSerialize(final DataOutputStream stream) throws IOException { + super.serialize(stream); + + ReadWriteIOUtils.write(oldName, stream); + ReadWriteIOUtils.write(newName, stream); + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + this.oldName = ReadWriteIOUtils.readString(byteBuffer); + this.newName = ReadWriteIOUtils.readString(byteBuffer); + } + + @Override + public boolean equals(final Object o) { + return super.equals(o) + && Objects.equals(oldName, ((RenameTableColumnProcedure) o).oldName) + && Objects.equals(newName, ((RenameTableColumnProcedure) o).newName); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), oldName, newName); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java new file mode 100644 index 0000000000000..138ae9c9b50ea --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema.table; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.consensus.request.write.table.SetTablePropertiesPlan; +import org.apache.iotdb.confignode.consensus.request.write.table.view.SetViewPropertiesPlan; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.schema.table.view.SetViewPropertiesProcedure; +import org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.COMMIT_RELEASE; +import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.PRE_RELEASE; +import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.SET_PROPERTIES; +import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.VALIDATE_TABLE; + +public class SetTablePropertiesProcedure + extends AbstractAlterOrDropTableProcedure { + + private static final Logger LOGGER = LoggerFactory.getLogger(SetTablePropertiesProcedure.class); + + private Map originalProperties = new HashMap<>(); + private Map updatedProperties; + + public SetTablePropertiesProcedure(final boolean isGeneratedByPipe) { + super(isGeneratedByPipe); + } + + public SetTablePropertiesProcedure( + final String database, + final String tableName, + final String queryId, + final Map properties, + final boolean isGeneratedByPipe) { + super(database, tableName, queryId, isGeneratedByPipe); + this.updatedProperties = properties; + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final SetTablePropertiesState state) + throws InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case VALIDATE_TABLE: + validateTable(env); + LOGGER.info( + "Validate table for table {}.{} when setting properties", database, tableName); + if (!isFailed() && Objects.isNull(table)) { + LOGGER.info( + "The updated table has the same properties with the original one. Skip the procedure."); + return Flow.NO_MORE_STATE; + } + break; + case PRE_RELEASE: + preRelease(env); + LOGGER.info( + "Pre release info for table {}.{} when setting properties", database, tableName); + break; + case SET_PROPERTIES: + setProperties(env); + LOGGER.info("Set properties to table {}.{}", database, tableName); + break; + case COMMIT_RELEASE: + commitRelease(env); + LOGGER.info( + "Commit release info of table {}.{} when setting properties", database, tableName); + return Flow.NO_MORE_STATE; + default: + setFailure(new ProcedureException("Unrecognized AddTableColumnState " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "SetTableProperties-{}.{}-{} costs {}ms", + database, + tableName, + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void validateTable(final ConfigNodeProcedureEnv env) { + try { + final Pair result = + env.getConfigManager() + .getClusterSchemaManager() + .updateTableProperties( + database, + tableName, + originalProperties, + updatedProperties, + this instanceof SetViewPropertiesProcedure); + final TSStatus status = result.getLeft(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + return; + } + table = result.getRight(); + setNextState(PRE_RELEASE); + } catch (final MetadataException e) { + setFailure(new ProcedureException(e)); + } + } + + @Override + protected void preRelease(final ConfigNodeProcedureEnv env) { + super.preRelease(env); + setNextState(SET_PROPERTIES); + } + + private void setProperties(final ConfigNodeProcedureEnv env) { + final TSStatus status = + env.getConfigManager() + .getClusterSchemaManager() + .executePlan( + this instanceof SetViewPropertiesProcedure + ? new SetViewPropertiesPlan(database, tableName, updatedProperties) + : new SetTablePropertiesPlan(database, tableName, updatedProperties), + isGeneratedByPipe); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + } else { + setNextState(COMMIT_RELEASE); + } + } + + @Override + protected String getActionMessage() { + return "set table properties"; + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final SetTablePropertiesState state) + throws IOException, InterruptedException, ProcedureException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case PRE_RELEASE: + LOGGER.info( + "Start rollback pre release info for table {}.{} when setting properties", + database, + table.getTableName()); + rollbackPreRelease(env); + break; + case SET_PROPERTIES: + LOGGER.info( + "Start rollback set properties to table {}.{}", database, table.getTableName()); + rollbackSetProperties(env); + break; + } + } finally { + LOGGER.info( + "Rollback SetTableProperties-{} costs {}ms.", + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void rollbackSetProperties(final ConfigNodeProcedureEnv env) { + if (table == null) { + return; + } + final TSStatus status = + env.getConfigManager() + .getClusterSchemaManager() + .executePlan( + this instanceof SetViewPropertiesProcedure + ? new SetViewPropertiesPlan(database, tableName, originalProperties) + : new SetTablePropertiesPlan(database, tableName, originalProperties), + isGeneratedByPipe); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status))); + } + } + + @Override + protected SetTablePropertiesState getState(final int stateId) { + return SetTablePropertiesState.values()[stateId]; + } + + @Override + protected int getStateId(final SetTablePropertiesState state) { + return state.ordinal(); + } + + @Override + protected SetTablePropertiesState getInitialState() { + return VALIDATE_TABLE; + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + stream.writeShort( + isGeneratedByPipe + ? ProcedureType.PIPE_ENRICHED_SET_TABLE_PROPERTIES_PROCEDURE.getTypeCode() + : ProcedureType.SET_TABLE_PROPERTIES_PROCEDURE.getTypeCode()); + innerSerialize(stream); + } + + protected void innerSerialize(final DataOutputStream stream) throws IOException { + super.serialize(stream); + + ReadWriteIOUtils.write(originalProperties, stream); + ReadWriteIOUtils.write(updatedProperties, stream); + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + this.originalProperties = ReadWriteIOUtils.readMap(byteBuffer); + this.updatedProperties = ReadWriteIOUtils.readMap(byteBuffer); + } + + @Override + public boolean equals(final Object o) { + return super.equals(o) + && Objects.equals(updatedProperties, ((SetTablePropertiesProcedure) o).updatedProperties); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), updatedProperties); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 05fc4b8008465..0030980634275 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.utils.CommonUtils; @@ -102,7 +103,7 @@ public RestApiServiceImpl() { public Response executeFastLastQueryStatement( PrefixPathList prefixPathList, SecurityContext securityContext) { Long queryId = null; - Statement statement = null; + QueryStatement statement = null; boolean finish = false; long startTime = System.nanoTime(); @@ -113,28 +114,31 @@ public Response executeFastLastQueryStatement( new PartialPath(prefixPathList.getPrefixPaths().toArray(new String[0])); final Map> resultMap = new HashMap<>(); + // Check permission, the cost is rather low because the req only contains one prefix path + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + final TSLastDataQueryReq tsLastDataQueryReq = + FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList); + statement = StatementGenerator.createStatement(tsLastDataQueryReq); + + final Response response = authorizationHandler.checkAuthority(securityContext, statement); + if (response != null) { + return response; + } + final String prefixString = prefixPath.toString(); for (ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) { if (!prefixString.startsWith(region.getDatabaseFullPath()) && !region.getDatabaseFullPath().startsWith(prefixString)) { continue; } - region.fillLastQueryMap(prefixPath, resultMap); + region.fillLastQueryMap(prefixPath, resultMap, statement.getAuthorityScope()); } // Check cache first if (!DataNodeSchemaCache.getInstance().getDeviceSchemaCache().getLastCache(resultMap)) { - IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); - TSLastDataQueryReq tsLastDataQueryReq = - FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList); - statement = StatementGenerator.createStatement(tsLastDataQueryReq); - if (ExecuteStatementHandler.validateStatement(statement)) { return FastLastHandler.buildErrorResponse(TSStatusCode.EXECUTE_STATEMENT_ERROR); } - Optional.ofNullable(authorizationHandler.checkAuthority(securityContext, statement)) - .ifPresent(Response.class::cast); - queryId = SESSION_MANAGER.requestQueryId(); SessionInfo sessionInfo = SESSION_MANAGER.getSessionInfo(clientSession); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 8a498fce2e3d3..2ae50d3605718 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.audit.AuditLogger; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -87,6 +88,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement; @@ -819,7 +821,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( try { final long queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - // 1. Map ISchemaFetcher.getAllSensors(prefix) ~= 50ms + // 1.1 Map ISchemaFetcher.getAllSensors(prefix) ~= 50ms final PartialPath prefixPath = new PartialPath(req.getPrefixes().toArray(new String[0])); if (prefixPath.hasWildcard()) { @@ -832,13 +834,20 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( final Map> resultMap = new HashMap<>(); int sensorNum = 0; + // 1.2 Check permission, the cost is rather low because the req only contains one prefix path + final QueryStatement s = StatementGenerator.createStatement(convert(req)); + final TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } + final String prefixString = prefixPath.toString(); for (final ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) { if (!prefixString.startsWith(region.getDatabaseFullPath()) && !region.getDatabaseFullPath().startsWith(prefixString)) { continue; } - sensorNum += region.fillLastQueryMap(prefixPath, resultMap); + sensorNum += region.fillLastQueryMap(prefixPath, resultMap, s.getAuthorityScope()); } // 2.DATA_NODE_SCHEMA_CACHE.getLastCache() @@ -914,6 +923,13 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( long startTime = System.nanoTime(); Throwable t = null; try { + // Place the permission check first + final QueryStatement s = StatementGenerator.createStatement(convert(req)); + final TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } + String db; String deviceId; PartialPath devicePath; @@ -969,9 +985,11 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( regionReplicaSet -> isSameNode( regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint)); - int sensorNum = req.sensors.size(); - TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); + final int sensorNum = req.sensors.size(); + final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); boolean allCached = true; + + PathPatternTree queryTree = new PathPatternTree(); for (String sensor : req.sensors) { PartialPath fullPath; if (req.isLegalPathNodes()) { @@ -979,24 +997,32 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( } else { fullPath = devicePath.concatNode((new PartialPath(sensor)).getFullPath()); } - TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath); - if (timeValuePair == null) { - allCached = false; - break; - } else if (timeValuePair.getValue() == null) { - // there is no data for this sensor - if (!canUseNullEntry) { + queryTree.appendPathPattern(fullPath); + } + queryTree.constructTree(); + queryTree = s.getAuthorityScope().intersectWithFullPathPrefixTree(queryTree); + + if (!queryTree.isEmpty()) { + for (final MeasurementPath fullPath : queryTree.getAllPathPatterns(true)) { + TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath); + if (timeValuePair == null) { allCached = false; break; + } else if (timeValuePair.getValue() == null) { + // there is no data for this sensor + if (!canUseNullEntry) { + allCached = false; + break; + } + } else { + // we don't consider TTL + LastQueryUtil.appendLastValue( + builder, + timeValuePair.getTimestamp(), + new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), + timeValuePair.getValue().getStringValue(), + timeValuePair.getValue().getDataType().name()); } - } else { - // we don't consider TTL - LastQueryUtil.appendLastValue( - builder, - timeValuePair.getTimestamp(), - new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), - timeValuePair.getValue().getStringValue(), - timeValuePair.getValue().getDataType().name()); } } // cache hit @@ -1016,14 +1042,6 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( } } - // cache miss - Statement s = StatementGenerator.createStatement(convert(req)); - // permission check - TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return RpcUtils.getTSExecuteStatementResp(status); - } - quota = DataNodeThrottleQuotaManager.getInstance() .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index b220309df5832..9f149abd531eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -170,7 +170,7 @@ public static Statement createStatement(TSRawDataQueryReq rawDataQueryReq) return queryStatement; } - public static Statement createStatement(TSLastDataQueryReq lastDataQueryReq) + public static QueryStatement createStatement(TSLastDataQueryReq lastDataQueryReq) throws IllegalPathException { final long startTime = System.nanoTime(); // construct query statement diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java index df69657f1d1fb..7248fe260463a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java @@ -317,7 +317,9 @@ long countPathsUsingTemplate(int templateId, PathPatternTree patternTree) throws MetadataException; int fillLastQueryMap( - final PartialPath pattern, final Map> mapToFill) + final PartialPath pattern, + final Map> mapToFill, + final PathPatternTree scope) throws MetadataException; // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 7f8074c61ff30..2f75914e51fa2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -1335,9 +1335,11 @@ public long countPathsUsingTemplate(int templateId, PathPatternTree patternTree) @Override public int fillLastQueryMap( - final PartialPath pattern, final Map> mapToFill) + final PartialPath pattern, + final Map> mapToFill, + final PathPatternTree scope) throws MetadataException { - return mtree.fillLastQueryMap(pattern, mapToFill); + return mTree.fillLastQueryMap(pattern, mapToFill, scope); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index fd8d7c5f1b550..73bbce272e2fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -1440,7 +1440,9 @@ public long countPathsUsingTemplate(int templateId, PathPatternTree patternTree) @Override public int fillLastQueryMap( - final PartialPath pattern, final Map> mapToFill) { + final PartialPath pattern, + final Map> mapToFill, + final PathPatternTree scope) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index 63d1e4c097f1b..b6f1221f7013f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -1085,12 +1085,13 @@ public IDeviceSchemaInfo next() { } public int fillLastQueryMap( - final PartialPath prefixPath, final Map> mapToFill) + final PartialPath prefixPath, + final Map> mapToFill, + final PathPatternTree scope) throws MetadataException { final int[] sensorNum = {0}; try (final EntityUpdater updater = - new EntityUpdater( - rootNode, prefixPath, store, true, SchemaConstant.ALL_MATCH_SCOPE) { + new EntityUpdater(rootNode, prefixPath, store, true, scope) { @Override protected void updateEntity(final IDeviceMNode node) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternTree.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternTree.java index 3b40a3b2ae202..53014d0c3a529 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternTree.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternTree.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; public class PathPatternTree { @@ -94,7 +95,6 @@ public void appendFullPath(PartialPath devicePath, String measurement) { appendBranchWithoutPrune(root, pathNodes, 0); } - /** Add a pathPattern (may contain wildcards) to pathPatternList. */ public void appendPathPattern(PartialPath pathPattern) { if (useWildcard) { boolean isExist = false; @@ -120,7 +120,8 @@ public void constructTree() { for (PartialPath path : pathPatternList) { appendBranchWithoutPrune(root, path.getNodes(), 0); } - pathPatternList.clear(); + // Do not clear to avoid concurrent modification + pathPatternList = new LinkedList<>(); } private void appendBranchWithoutPrune( @@ -245,16 +246,24 @@ private void searchDevicePath( public List getAllPathPatterns() { List result = new ArrayList<>(); Deque ancestors = new ArrayDeque<>(); - searchPathPattern(root, ancestors, result); + searchPathPattern(root, ancestors, result, false); return result; } - private void searchPathPattern( + public List getAllPathPatterns(boolean asMeasurementPath) { + List result = new ArrayList<>(); + Deque ancestors = new ArrayDeque<>(); + searchPathPattern(root, ancestors, result, asMeasurementPath); + return result; + } + + private void searchPathPattern( PathPatternNode node, Deque ancestors, - List fullPaths) { + List fullPaths, + boolean asMeasurementPath) { if (node.isPathPattern()) { - fullPaths.add(convertNodesToPartialPath(node, ancestors)); + fullPaths.add((T) convertNodesToPartialPath(node, ancestors, asMeasurementPath)); if (node.isLeaf()) { return; } @@ -262,23 +271,19 @@ private void searchPathPattern( ancestors.push(node.getName()); for (PathPatternNode child : node.getChildren().values()) { - searchPathPattern(child, ancestors, fullPaths); + searchPathPattern(child, ancestors, fullPaths, asMeasurementPath); } ancestors.pop(); } - public List getOverlappedPathPatterns(PartialPath pattern) { - if (pathPatternList.isEmpty()) { - pathPatternList = getAllPathPatterns(); + public List getOverlappedPathPatterns(final PartialPath pattern) { + List patternList = pathPatternList; + if (Objects.isNull(patternList) || patternList.isEmpty()) { + patternList = getAllPathPatterns(); + pathPatternList = patternList; } - List results = new ArrayList<>(); - for (PartialPath path : pathPatternList) { - if (pattern.overlapWith(path)) { - results.add(path); - } - } - return results; + return patternList.stream().filter(pattern::overlapWith).collect(Collectors.toList()); } private String convertNodesToString(List nodes) { @@ -290,14 +295,20 @@ private String convertNodesToString(List nodes) { } private PartialPath convertNodesToPartialPath( - PathPatternNode node, Deque ancestors) { + PathPatternNode node, + Deque ancestors, + boolean asMeasurementPath) { Iterator iterator = ancestors.descendingIterator(); List nodeList = new ArrayList<>(ancestors.size() + 1); while (iterator.hasNext()) { nodeList.add(iterator.next()); } nodeList.add(node.getName()); - return new PartialPath(nodeList.toArray(new String[0])); + if (asMeasurementPath) { + return new MeasurementPath(nodeList.toArray(new String[0])); + } else { + return new PartialPath(nodeList.toArray(new String[0])); + } } public boolean isOverlapWith(PathPatternTree patternTree) { From 6f4ed440e10e36677036238f40da7af694108add Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:21:53 +0800 Subject: [PATCH 2/4] dele --- .../table/RenameTableColumnProcedure.java | 247 ---------------- .../table/SetTablePropertiesProcedure.java | 264 ------------------ 2 files changed, 511 deletions(-) delete mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java delete mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java deleted file mode 100644 index a2a6c72577cf9..0000000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.procedure.impl.schema.table; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.schema.table.TsTable; -import org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan; -import org.apache.iotdb.confignode.consensus.request.write.table.view.RenameViewColumnPlan; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.exception.ProcedureException; -import org.apache.iotdb.confignode.procedure.impl.schema.table.view.RenameViewColumnProcedure; -import org.apache.iotdb.confignode.procedure.state.schema.RenameTableColumnState; -import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Objects; - -public class RenameTableColumnProcedure - extends AbstractAlterOrDropTableProcedure { - private static final Logger LOGGER = LoggerFactory.getLogger(RenameTableColumnProcedure.class); - - private String oldName; - private String newName; - - public RenameTableColumnProcedure(final boolean isGeneratedByPipe) { - super(isGeneratedByPipe); - } - - public RenameTableColumnProcedure( - final String database, - final String tableName, - final String queryId, - final String oldName, - final String newName, - final boolean isGeneratedByPipe) { - super(database, tableName, queryId, isGeneratedByPipe); - this.oldName = oldName; - this.newName = newName; - } - - @Override - protected Flow executeFromState( - final ConfigNodeProcedureEnv env, final RenameTableColumnState state) - throws InterruptedException { - final long startTime = System.currentTimeMillis(); - try { - switch (state) { - case COLUMN_CHECK: - LOGGER.info("Column check for table {}.{} when renaming column", database, tableName); - columnCheck(env); - break; - case PRE_RELEASE: - LOGGER.info("Pre release info of table {}.{} when renaming column", database, tableName); - preRelease(env); - break; - case RENAME_COLUMN: - LOGGER.info("Rename column to table {}.{} on config node", database, tableName); - renameColumn(env); - break; - case COMMIT_RELEASE: - LOGGER.info( - "Commit release info of table {}.{} when renaming column", database, tableName); - commitRelease(env); - return Flow.NO_MORE_STATE; - default: - setFailure(new ProcedureException("Unrecognized RenameTableColumnState " + state)); - return Flow.NO_MORE_STATE; - } - return Flow.HAS_MORE_STATE; - } finally { - LOGGER.info( - "RenameTableColumn-{}.{}-{} costs {}ms", - database, - tableName, - state, - (System.currentTimeMillis() - startTime)); - } - } - - private void columnCheck(final ConfigNodeProcedureEnv env) { - try { - final Pair result = - env.getConfigManager() - .getClusterSchemaManager() - .tableColumnCheckForColumnRenaming( - database, tableName, oldName, newName, this instanceof RenameViewColumnProcedure); - final TSStatus status = result.getLeft(); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - return; - } - table = result.getRight(); - setNextState(RenameTableColumnState.PRE_RELEASE); - } catch (final MetadataException e) { - setFailure(new ProcedureException(e)); - } - } - - @Override - protected void preRelease(final ConfigNodeProcedureEnv env) { - super.preRelease(env); - setNextState(RenameTableColumnState.RENAME_COLUMN); - } - - private void renameColumn(final ConfigNodeProcedureEnv env) { - final TSStatus status = - env.getConfigManager() - .getClusterSchemaManager() - .executePlan( - this instanceof RenameViewColumnProcedure - ? new RenameViewColumnPlan(database, tableName, oldName, newName) - : new RenameTableColumnPlan(database, tableName, oldName, newName), - isGeneratedByPipe); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - } else { - setNextState(RenameTableColumnState.COMMIT_RELEASE); - } - } - - @Override - protected void rollbackState(final ConfigNodeProcedureEnv env, final RenameTableColumnState state) - throws IOException, InterruptedException, ProcedureException { - final long startTime = System.currentTimeMillis(); - try { - switch (state) { - case RENAME_COLUMN: - LOGGER.info( - "Start rollback Renaming column to table {}.{} on configNode", - database, - table.getTableName()); - rollbackRenameColumn(env); - break; - case PRE_RELEASE: - LOGGER.info( - "Start rollback pre release info of table {}.{}", database, table.getTableName()); - rollbackPreRelease(env); - break; - } - } finally { - LOGGER.info( - "Rollback RenameTableColumn-{} costs {}ms.", - state, - (System.currentTimeMillis() - startTime)); - } - } - - private void rollbackRenameColumn(final ConfigNodeProcedureEnv env) { - if (table == null) { - return; - } - final TSStatus status = - env.getConfigManager() - .getClusterSchemaManager() - .executePlan( - this instanceof RenameViewColumnProcedure - ? new RenameViewColumnPlan(database, tableName, newName, oldName) - : new RenameTableColumnPlan(database, tableName, newName, oldName), - isGeneratedByPipe); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - } - } - - @Override - protected RenameTableColumnState getState(final int stateId) { - return RenameTableColumnState.values()[stateId]; - } - - @Override - protected int getStateId(final RenameTableColumnState state) { - return state.ordinal(); - } - - @Override - protected RenameTableColumnState getInitialState() { - return RenameTableColumnState.COLUMN_CHECK; - } - - @Override - protected String getActionMessage() { - return "rename table column"; - } - - @Override - public void serialize(final DataOutputStream stream) throws IOException { - stream.writeShort( - isGeneratedByPipe - ? ProcedureType.PIPE_ENRICHED_RENAME_TABLE_COLUMN_PROCEDURE.getTypeCode() - : ProcedureType.RENAME_TABLE_COLUMN_PROCEDURE.getTypeCode()); - innerSerialize(stream); - } - - protected void innerSerialize(final DataOutputStream stream) throws IOException { - super.serialize(stream); - - ReadWriteIOUtils.write(oldName, stream); - ReadWriteIOUtils.write(newName, stream); - } - - @Override - public void deserialize(final ByteBuffer byteBuffer) { - super.deserialize(byteBuffer); - - this.oldName = ReadWriteIOUtils.readString(byteBuffer); - this.newName = ReadWriteIOUtils.readString(byteBuffer); - } - - @Override - public boolean equals(final Object o) { - return super.equals(o) - && Objects.equals(oldName, ((RenameTableColumnProcedure) o).oldName) - && Objects.equals(newName, ((RenameTableColumnProcedure) o).newName); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), oldName, newName); - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java deleted file mode 100644 index 138ae9c9b50ea..0000000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/SetTablePropertiesProcedure.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.procedure.impl.schema.table; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.schema.table.TsTable; -import org.apache.iotdb.confignode.consensus.request.write.table.SetTablePropertiesPlan; -import org.apache.iotdb.confignode.consensus.request.write.table.view.SetViewPropertiesPlan; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.procedure.exception.ProcedureException; -import org.apache.iotdb.confignode.procedure.impl.schema.table.view.SetViewPropertiesProcedure; -import org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState; -import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.COMMIT_RELEASE; -import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.PRE_RELEASE; -import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.SET_PROPERTIES; -import static org.apache.iotdb.confignode.procedure.state.schema.SetTablePropertiesState.VALIDATE_TABLE; - -public class SetTablePropertiesProcedure - extends AbstractAlterOrDropTableProcedure { - - private static final Logger LOGGER = LoggerFactory.getLogger(SetTablePropertiesProcedure.class); - - private Map originalProperties = new HashMap<>(); - private Map updatedProperties; - - public SetTablePropertiesProcedure(final boolean isGeneratedByPipe) { - super(isGeneratedByPipe); - } - - public SetTablePropertiesProcedure( - final String database, - final String tableName, - final String queryId, - final Map properties, - final boolean isGeneratedByPipe) { - super(database, tableName, queryId, isGeneratedByPipe); - this.updatedProperties = properties; - } - - @Override - protected Flow executeFromState( - final ConfigNodeProcedureEnv env, final SetTablePropertiesState state) - throws InterruptedException { - final long startTime = System.currentTimeMillis(); - try { - switch (state) { - case VALIDATE_TABLE: - validateTable(env); - LOGGER.info( - "Validate table for table {}.{} when setting properties", database, tableName); - if (!isFailed() && Objects.isNull(table)) { - LOGGER.info( - "The updated table has the same properties with the original one. Skip the procedure."); - return Flow.NO_MORE_STATE; - } - break; - case PRE_RELEASE: - preRelease(env); - LOGGER.info( - "Pre release info for table {}.{} when setting properties", database, tableName); - break; - case SET_PROPERTIES: - setProperties(env); - LOGGER.info("Set properties to table {}.{}", database, tableName); - break; - case COMMIT_RELEASE: - commitRelease(env); - LOGGER.info( - "Commit release info of table {}.{} when setting properties", database, tableName); - return Flow.NO_MORE_STATE; - default: - setFailure(new ProcedureException("Unrecognized AddTableColumnState " + state)); - return Flow.NO_MORE_STATE; - } - return Flow.HAS_MORE_STATE; - } finally { - LOGGER.info( - "SetTableProperties-{}.{}-{} costs {}ms", - database, - tableName, - state, - (System.currentTimeMillis() - startTime)); - } - } - - private void validateTable(final ConfigNodeProcedureEnv env) { - try { - final Pair result = - env.getConfigManager() - .getClusterSchemaManager() - .updateTableProperties( - database, - tableName, - originalProperties, - updatedProperties, - this instanceof SetViewPropertiesProcedure); - final TSStatus status = result.getLeft(); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - return; - } - table = result.getRight(); - setNextState(PRE_RELEASE); - } catch (final MetadataException e) { - setFailure(new ProcedureException(e)); - } - } - - @Override - protected void preRelease(final ConfigNodeProcedureEnv env) { - super.preRelease(env); - setNextState(SET_PROPERTIES); - } - - private void setProperties(final ConfigNodeProcedureEnv env) { - final TSStatus status = - env.getConfigManager() - .getClusterSchemaManager() - .executePlan( - this instanceof SetViewPropertiesProcedure - ? new SetViewPropertiesPlan(database, tableName, updatedProperties) - : new SetTablePropertiesPlan(database, tableName, updatedProperties), - isGeneratedByPipe); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - } else { - setNextState(COMMIT_RELEASE); - } - } - - @Override - protected String getActionMessage() { - return "set table properties"; - } - - @Override - protected void rollbackState( - final ConfigNodeProcedureEnv env, final SetTablePropertiesState state) - throws IOException, InterruptedException, ProcedureException { - final long startTime = System.currentTimeMillis(); - try { - switch (state) { - case PRE_RELEASE: - LOGGER.info( - "Start rollback pre release info for table {}.{} when setting properties", - database, - table.getTableName()); - rollbackPreRelease(env); - break; - case SET_PROPERTIES: - LOGGER.info( - "Start rollback set properties to table {}.{}", database, table.getTableName()); - rollbackSetProperties(env); - break; - } - } finally { - LOGGER.info( - "Rollback SetTableProperties-{} costs {}ms.", - state, - (System.currentTimeMillis() - startTime)); - } - } - - private void rollbackSetProperties(final ConfigNodeProcedureEnv env) { - if (table == null) { - return; - } - final TSStatus status = - env.getConfigManager() - .getClusterSchemaManager() - .executePlan( - this instanceof SetViewPropertiesProcedure - ? new SetViewPropertiesPlan(database, tableName, originalProperties) - : new SetTablePropertiesPlan(database, tableName, originalProperties), - isGeneratedByPipe); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - setFailure(new ProcedureException(new IoTDBException(status))); - } - } - - @Override - protected SetTablePropertiesState getState(final int stateId) { - return SetTablePropertiesState.values()[stateId]; - } - - @Override - protected int getStateId(final SetTablePropertiesState state) { - return state.ordinal(); - } - - @Override - protected SetTablePropertiesState getInitialState() { - return VALIDATE_TABLE; - } - - @Override - public void serialize(final DataOutputStream stream) throws IOException { - stream.writeShort( - isGeneratedByPipe - ? ProcedureType.PIPE_ENRICHED_SET_TABLE_PROPERTIES_PROCEDURE.getTypeCode() - : ProcedureType.SET_TABLE_PROPERTIES_PROCEDURE.getTypeCode()); - innerSerialize(stream); - } - - protected void innerSerialize(final DataOutputStream stream) throws IOException { - super.serialize(stream); - - ReadWriteIOUtils.write(originalProperties, stream); - ReadWriteIOUtils.write(updatedProperties, stream); - } - - @Override - public void deserialize(final ByteBuffer byteBuffer) { - super.deserialize(byteBuffer); - - this.originalProperties = ReadWriteIOUtils.readMap(byteBuffer); - this.updatedProperties = ReadWriteIOUtils.readMap(byteBuffer); - } - - @Override - public boolean equals(final Object o) { - return super.equals(o) - && Objects.equals(updatedProperties, ((SetTablePropertiesProcedure) o).updatedProperties); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), updatedProperties); - } -} From 73b513d60c4ef4edf65d91896daf8601621ac5fa Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:44:14 +0800 Subject: [PATCH 3/4] shop --- .../schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 2f75914e51fa2..6fc556a1089da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -1339,7 +1339,7 @@ public int fillLastQueryMap( final Map> mapToFill, final PathPatternTree scope) throws MetadataException { - return mTree.fillLastQueryMap(pattern, mapToFill, scope); + return mtree.fillLastQueryMap(pattern, mapToFill, scope); } @Override From 4b1b39cf18097326753009b12343d83d7ed6622c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:20:32 +0800 Subject: [PATCH 4/4] f --- .../java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java index 8ba5fcc617093..7ade93490a79d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionQueryIT.java @@ -42,7 +42,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual; import static org.junit.Assert.fail; @@ -293,7 +295,7 @@ public void lastQueryWithoutPermissionTest() throws IoTDBConnectionException { "40,root.sg1.d1.s5,aligned_test40,TEXT", "23,root.sg1.d1.s1,230000.0,FLOAT", "40,root.sg1.d1.s2,40,INT32")); - TestUtils.executeNonQuery(EnvFactory.getEnv(), "create user abcd 'veryComplexPassword@123'"); + TestUtils.executeNonQuery("create user abcd 'veryComplexPassword@123'"); try (final ISession session = EnvFactory.getEnv().getSessionConnection("abcd", "veryComplexPassword@123");