Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.it;

import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -513,6 +515,7 @@ public void insertAndQuery() {
selectLast(httpClient);

queryV2(httpClient);
selectFastLast(httpClient);
queryGroupByLevelV2(httpClient);
queryRowLimitV2(httpClient);
queryShowChildPathsV2(httpClient);
Expand Down Expand Up @@ -923,6 +926,71 @@ public void queryWithWrongAuthorization() {
}
}

@Test
public void queryFastLastWithWrongAuthorization() {
CloseableHttpResponse response = null;

TestUtils.executeNonQuery("create user abcd 'strongPassword@1234'");
try {
final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
final HttpPost httpPost = new HttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery");
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
final String authorization = getAuthorization("abcd", "strongPassword@1234");
httpPost.setHeader("Authorization", authorization);
final String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}";
httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
response = httpClient.execute(httpPost);
break;
} catch (Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

Assert.assertEquals(200, response.getStatusLine().getStatusCode());
String message = EntityUtils.toString(response.getEntity(), "utf-8");
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(message, Map.class);
List<Long> timestampsResult = (List<Long>) map.get("timestamps");
List<Long> expressionsResult = (List<Long>) map.get("expressions");
List<List<Object>> valuesResult = (List<List<Object>>) map.get("values");
Assert.assertTrue(map.size() > 0);
List<Object> expressions =
new ArrayList<Object>() {
{
add("Timeseries");
add("Value");
add("DataType");
}
};

Assert.assertEquals(expressions, expressionsResult);
Assert.assertEquals(Collections.emptyList(), timestampsResult);
Assert.assertEquals(Collections.emptyList(), valuesResult);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

public void query(CloseableHttpClient httpClient) {
CloseableHttpResponse response = null;
try {
Expand Down Expand Up @@ -1677,6 +1745,98 @@ public void queryV2(CloseableHttpClient httpClient) {
}
}

public void selectFastLast(CloseableHttpClient httpClient) {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
CloseableHttpResponse response = null;
try {
HttpPost httpPost = getHttpPost("http://127.0.0.1:" + port + "/rest/v2/fastLastQuery");
String sql = "{\"prefix_paths\":[\"root\",\"sg25\"]}";
httpPost.setEntity(new StringEntity(sql, Charset.defaultCharset()));
response = httpClient.execute(httpPost);
HttpEntity responseEntity = response.getEntity();
String message = EntityUtils.toString(responseEntity, "utf-8");
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(message, Map.class);
List<Long> timestampsResult = (List<Long>) map.get("timestamps");
List<Long> expressionsResult = (List<Long>) map.get("expressions");
List<List<Object>> valuesResult = (List<List<Object>>) map.get("values");
Assert.assertTrue(map.size() > 0);
List<Object> expressions =
new ArrayList<Object>() {
{
add("Timeseries");
add("Value");
add("DataType");
}
};
List<Object> timestamps =
new ArrayList<Object>() {
{
add(1635232153960l);
add(1635232153960l);
add(1635232153960l);
add(1635232143960l);
add(1635232153960l);
add(1635232153960l);
}
};
List<Object> values1 =
new ArrayList<Object>() {
{
add("root.sg25.s3");
add("root.sg25.s4");
add("root.sg25.s5");
add("root.sg25.s6");
add("root.sg25.s7");
add("root.sg25.s8");
}
};
List<Object> values2 =
new ArrayList<Object>() {
{
add("");
add("2");
add("1635000012345556");
add("1.41");
add("false");
add("3.5555");
}
};
List<Object> values3 =
new ArrayList<Object>() {
{
add("TEXT");
add("INT32");
add("INT64");
add("FLOAT");
add("BOOLEAN");
add("DOUBLE");
}
};

Assert.assertEquals(expressions, expressionsResult);
Assert.assertEquals(timestamps, timestampsResult);
Assert.assertEquals(values1, valuesResult.get(0));
Assert.assertEquals(values2, valuesResult.get(1));
Assert.assertEquals(values3, valuesResult.get(2));
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}

public void queryGroupByLevelV2(CloseableHttpClient httpClient) {
CloseableHttpResponse response = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,32 @@ public static void assertResultSetEqual(
}
}

public static void assertResultSetEqual(
SessionDataSet actualResultSet,
List<String> expectedColumnNames,
Set<String> expectedRetSet,
boolean ignoreTimeStamp) {
final Set<String> copiedSet = new HashSet<>(expectedRetSet);
try {
List<String> actualColumnNames = actualResultSet.getColumnNames();
if (ignoreTimeStamp) {
assertEquals(expectedColumnNames, actualColumnNames);
} else {
assertEquals(TIMESTAMP_STR, actualColumnNames.get(0));
assertEquals(expectedColumnNames, actualColumnNames.subList(1, actualColumnNames.size()));
}

while (actualResultSet.hasNext()) {
RowRecord rowRecord = actualResultSet.next();
assertTrue(copiedSet.remove(rowRecord.toString().replace('\t', ',')));
}
assertEquals(0, copiedSet.size());
} catch (IoTDBConnectionException | StatementExecutionException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

public static void createUser(String userName, String password) {
createUser(EnvFactory.getEnv(), userName, password);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.it.utils.AlignedWriteUtil;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;

import org.junit.AfterClass;
Expand All @@ -40,7 +42,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -244,6 +248,81 @@ public void lastQueryForOneDeviceNoSchema() throws IoTDBConnectionException {
}
}

@Test
public void lastQueryWithPrefixTest() throws IoTDBConnectionException {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
final Set<String> retArray =
new HashSet<>(
Arrays.asList(
"30,root.sg1.d1.s3,30,INT64",
"30,root.sg1.d1.s4,false,BOOLEAN",
"40,root.sg1.d1.s5,aligned_test40,TEXT",
"23,root.sg1.d1.s1,230000.0,FLOAT",
"40,root.sg1.d1.s2,40,INT32"));

try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
// Push last cache first
try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}

try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}
} catch (StatementExecutionException | RedirectException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void lastQueryWithoutPermissionTest() throws IoTDBConnectionException {
// Only used in 1D scenarios
if (EnvFactory.getEnv().getDataNodeWrapperList().size() > 1) {
return;
}
final String[] retArray = new String[] {};
final Set<String> retArray2 =
new HashSet<>(
Arrays.asList(
"30,root.sg1.d1.s3,30,INT64",
"30,root.sg1.d1.s4,false,BOOLEAN",
"40,root.sg1.d1.s5,aligned_test40,TEXT",
"23,root.sg1.d1.s1,230000.0,FLOAT",
"40,root.sg1.d1.s2,40,INT32"));
TestUtils.executeNonQuery("create user abcd 'veryComplexPassword@123'");

try (final ISession session =
EnvFactory.getEnv().getSessionConnection("abcd", "veryComplexPassword@123");
final ISession rootSession = EnvFactory.getEnv().getSessionConnection()) {
// Push last cache first
try (final SessionDataSet resultSet =
rootSession.executeFastLastDataQueryForOnePrefixPath(
Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray2, true);
}

try (final SessionDataSet resultSet =
session.executeLastDataQueryForOneDevice(
"root.sg1", "root.sg1.d1", Arrays.asList("notExist", "s1"), true)) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}

try (final SessionDataSet resultSet =
session.executeFastLastDataQueryForOnePrefixPath(Arrays.asList("root", "sg1", "d1"))) {
assertResultSetEqual(resultSet, lastQueryColumnNames, retArray, true);
}
} catch (StatementExecutionException | RedirectException e) {
e.printStackTrace();
fail(e.getMessage());
}
}

// ------------------------------ Aggregation Query ------------------------------
@Test
public void aggregationQueryTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.utils.CommonUtils;
Expand Down Expand Up @@ -102,7 +103,7 @@ public RestApiServiceImpl() {
public Response executeFastLastQueryStatement(
PrefixPathList prefixPathList, SecurityContext securityContext) {
Long queryId = null;
Statement statement = null;
QueryStatement statement = null;
boolean finish = false;
long startTime = System.nanoTime();

Expand All @@ -113,28 +114,31 @@ public Response executeFastLastQueryStatement(
new PartialPath(prefixPathList.getPrefixPaths().toArray(new String[0]));
final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new HashMap<>();

// Check permission, the cost is rather low because the req only contains one prefix path
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
final TSLastDataQueryReq tsLastDataQueryReq =
FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList);
statement = StatementGenerator.createStatement(tsLastDataQueryReq);

final Response response = authorizationHandler.checkAuthority(securityContext, statement);
if (response != null) {
return response;
}

final String prefixString = prefixPath.toString();
for (ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) {
if (!prefixString.startsWith(region.getDatabaseFullPath())
&& !region.getDatabaseFullPath().startsWith(prefixString)) {
continue;
}
region.fillLastQueryMap(prefixPath, resultMap);
region.fillLastQueryMap(prefixPath, resultMap, statement.getAuthorityScope());
}
// Check cache first
if (!DataNodeSchemaCache.getInstance().getDeviceSchemaCache().getLastCache(resultMap)) {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
TSLastDataQueryReq tsLastDataQueryReq =
FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList);
statement = StatementGenerator.createStatement(tsLastDataQueryReq);

if (ExecuteStatementHandler.validateStatement(statement)) {
return FastLastHandler.buildErrorResponse(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}

Optional.ofNullable(authorizationHandler.checkAuthority(securityContext, statement))
.ifPresent(Response.class::cast);

queryId = SESSION_MANAGER.requestQueryId();
SessionInfo sessionInfo = SESSION_MANAGER.getSessionInfo(clientSession);

Expand Down
Loading
Loading