Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ public void testInformationSchema() throws SQLException {
"plugin_name,STRING,TAG,",
"plugin_type,STRING,ATTRIBUTE,",
"class_name,STRING,ATTRIBUTE,",
"plugin_jar,STRING,ATTRIBUTE,")));
"plugin_jar,STRING,ATTRIBUTE,",
"exception_message,STRING,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("desc topics"),
"ColumnName,DataType,Category,",
Expand Down Expand Up @@ -708,9 +709,9 @@ public void testInformationSchema() throws SQLException {
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"),
"plugin_name,plugin_type,class_name,plugin_jar,",
"plugin_name,plugin_type,class_name,plugin_jar,exception_message,",
Collections.singleton(
"IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,"));
"IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,"));

TestUtils.assertResultSetEqual(
statement.executeQuery("select * from views"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public PipePluginTableResp(
public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
final List<ByteBuffer> pipePluginInformationByteBuffers = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : allPipePluginMeta) {
pipePluginInformationByteBuffers.add(pipePluginMeta.serialize());
pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
}
return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
Expand Down Expand Up @@ -111,6 +112,15 @@ public boolean validateBeforeCreatingPipePlugin(
final String pluginName, final boolean isSetIfNotExistsCondition) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
final PipePluginMeta existedPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage();
if (loadingFailureMessage != null) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s",
pluginName, loadingFailureMessage));
}
if (isSetIfNotExistsCondition) {
return true;
}
Expand Down Expand Up @@ -177,6 +187,7 @@ public void checkPipePluginExistence(
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source");

final PipeParameters processorParameters = new PipeParameters(processorAttributes);
final String processorPluginName =
Expand All @@ -190,6 +201,7 @@ public void checkPipePluginExistence(
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor");

final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final String sinkPluginName =
Expand All @@ -204,13 +216,14 @@ public void checkPipePluginExistence(
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink");
}

/////////////////////////////// Pipe Plugin Management ///////////////////////////////

public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
try {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
final String jarName = pipePluginMeta.getJarName();
Expand All @@ -220,6 +233,22 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
} else {
final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (Objects.nonNull(existed)) {
final PipePluginMeta existedPipePluginMeta =
pipePluginMetaKeeper.getPipePluginMeta(existed);
final String existedLoadingFailureMessage =
existedPipePluginMeta.getPluginLoadingExceptionMessage();
if (existedLoadingFailureMessage != null) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s",
pluginName, existed, existedLoadingFailureMessage));
}
if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) {
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.",
pluginName, existed, jarName));
}
pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName);
computeFromPluginClass(pluginName, className);
} else {
Expand All @@ -237,7 +266,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());

return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
} catch (final Throwable e) {
final String errorMessage =
String.format(
"Failed to execute createPipePlugin(%s) on config nodes, because of %s",
Expand All @@ -249,7 +278,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
}

private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan)
throws Exception {
throws Throwable {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
Expand All @@ -258,15 +287,15 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu
pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
computeFromPluginClass(pluginName, className);
} catch (final Exception e) {
} catch (final Throwable e) {
// We need to rollback if the creation has failed
pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
throw e;
}
}

private void computeFromPluginClass(final String pluginName, final String className)
throws Exception {
throws Throwable {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
Expand All @@ -275,7 +304,7 @@ private void computeFromPluginClass(final String pluginName, final String classN
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Exception e) {
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
Expand Down Expand Up @@ -402,37 +431,84 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException {
if (pipePluginMeta.isBuiltin()) {
continue;
}
final String pluginName = pipePluginMeta.getPluginName();
try {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
try {
final Class<?> pluginClass =
Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
}
throw e;
}
} catch (final Throwable e) {
LOGGER.warn(
"Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
pluginName,
snapshotFile.getAbsolutePath(),
e);
}
createPipePluginOnStartup(pipePluginMeta, snapshotFile);
}
} finally {
releasePipePluginInfoLock();
}
}

private String getRootCauseMessage(final Throwable throwable) {
Throwable current = throwable;
while (current.getCause() != null && current.getCause() != current) {
current = current.getCause();
}
final String message = current.getMessage();
return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
}

private void checkPipePluginAvailabilityForPipeCreation(
final String pluginName, final String pluginType) {
final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage();
if (loadingFailureMessage != null) {
final String exceptionMessage =
String.format(
"Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s",
pluginType, pluginName, loadingFailureMessage);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
}

private void createPipePluginOnStartup(
final PipePluginMeta pipePluginMeta, final File snapshotFile) {
final String pluginName = pipePluginMeta.getPluginName();
try {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
try {
final Class<?> pluginClass =
Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
pipePluginMetaKeeper.addPipePluginMeta(
pluginName,
new PipePluginMeta(
pipePluginMeta.getPluginName(),
pipePluginMeta.getClassName(),
pipePluginMeta.isBuiltin(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5(),
null));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
} catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
}
throw e;
}
} catch (final Throwable e) {
pipePluginMetaKeeper.addPipePluginMeta(
pluginName,
new PipePluginMeta(
pipePluginMeta.getPluginName(),
pipePluginMeta.getClassName(),
pipePluginMeta.isBuiltin(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5(),
getRootCauseMessage(e)));
pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
LOGGER.warn(
"Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
pluginName,
snapshotFile.getAbsolutePath(),
e);
}
}

/////////////////////////////// hashCode & equals ///////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
Expand All @@ -35,7 +36,6 @@
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand All @@ -45,6 +45,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -162,8 +163,8 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName);

if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, true)).getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final List<TSStatus> dropStatusList = env.dropPipePluginOnDataNodes(pluginName, true);
if (dropStatusList.stream().allMatch(this::isDropPipePluginSuccessOrNotExists)) {
setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
return Flow.HAS_MORE_STATE;
}
Expand All @@ -172,6 +173,18 @@ private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
String.format("Failed to drop pipe plugin %s on data nodes", pluginName));
}

private boolean isDropPipePluginSuccessOrNotExists(final TSStatus status) {
if (status == null) {
return false;
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return true;
}
final String message = status.getMessage();
return message != null
&& (message.contains("does not exist") || message.contains("not been created"));
}

private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testConvertToThriftResponse() throws IOException {

final List<ByteBuffer> pipePluginByteBuffers = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : pipePluginMetaList) {
pipePluginByteBuffers.add(pipePluginMeta.serialize());
pipePluginByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
}
TGetPipePluginTableResp getPipePluginTableResp =
new TGetPipePluginTableResp(status, pipePluginByteBuffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoaderManager;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent;
Expand Down Expand Up @@ -176,6 +177,30 @@ public void doRegister(final PipePluginMeta pipePluginMeta) throws PipeException
}
}

public void markPluginLoadFailure(
final PipePluginMeta pipePluginMeta, final Throwable throwable) {
final String pluginName = pipePluginMeta.getPluginName();
pipePluginMetaKeeper.addPipePluginMeta(
pluginName,
new PipePluginMeta(
pipePluginMeta.getPluginName(),
pipePluginMeta.getClassName(),
pipePluginMeta.isBuiltin(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5(),
getRootCauseMessage(throwable)));
pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
}

private String getRootCauseMessage(final Throwable throwable) {
Throwable current = throwable;
while (current.getCause() != null && current.getCause() != current) {
current = current.getCause();
}
final String message = current.getMessage();
return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
}

public void deregister(final String pluginName, final boolean needToDeleteJar)
throws PipeException {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,20 @@ public static synchronized void launchPipePluginAgent(
}

// create instances of pipe plugins and do registration
try {
for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
if (meta.isBuiltin()) {
continue;
}
for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
if (meta.isBuiltin()) {
continue;
}
try {
PipeDataNodeAgent.plugin().doRegister(meta);
} catch (Throwable e) {
PipeDataNodeAgent.plugin().markPluginLoadFailure(meta, e);
// Ignore a single broken plugin and continue startup.
LOGGER.warn(
"Failure when register pipe plugin {}. Skip this plugin and continue startup.",
meta.getPluginName(),
e);
}
} catch (Throwable e) {
// Ignore the pipe plugin errors and continue to start
LOGGER.warn("Failure when register pipe plugins, will ignore.", e);
}
}

Expand Down
Loading