From b6a88c47a96e9bfb8c0b0712be3da540bbb22cc9 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 15:43:56 +0800 Subject: [PATCH 01/13] Show pipe plugin loading errors in plugin listing Add plugin loading exception messages to pipe plugin metadata and expose them through SHOW PIPEPLUGINS and information_schema.pipe_plugins, so users can diagnose initialization and class loading failures directly from query results. Made-with: Cursor --- .../persistence/pipe/PipePluginInfo.java | 27 +++++++++++++++++ ...formationSchemaContentSupplierFactory.java | 6 ++++ .../config/metadata/ShowPipePluginsTask.java | 7 +++++ .../agent/plugin/meta/PipePluginMeta.java | 29 +++++++++++++++++-- .../schema/column/ColumnHeaderConstant.java | 3 +- .../schema/table/InformationSchema.java | 3 ++ 6 files changed, 72 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index c7c138718f683..4d178b3321b77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -412,6 +412,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); pipePluginMetaKeeper.addPipePluginVisibility( pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null)); classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); } catch (final Throwable e) { try { @@ -421,6 +430,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { throw e; } } catch (final Throwable e) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(e))); LOGGER.warn( "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", pluginName, @@ -433,6 +451,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { } } + private String getRootCauseMessage(final Throwable throwable) { + Throwable current = throwable; + while (current.getCause() != null && current.getCause() != current) { + current = current.getCause(); + } + final String message = current.getMessage(); + return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index d7162b03be316..a5c95db9ede79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -706,6 +706,12 @@ protected void constructLine() { } else { columnBuilders[3].appendNull(); } + if (Objects.nonNull(pipePluginMeta.getPluginLoadingExceptionMessage())) { + columnBuilders[4].writeBinary( + BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage())); + } else { + columnBuilders[4].appendNull(); + } resultBuilder.declarePosition(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java index f186702d595ce..cd6edab749421 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java @@ -50,6 +50,7 @@ public class ShowPipePluginsTask implements IConfigTask { public static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External"); private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = BytesUtils.valueOf(""); + private static final Binary PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD = BytesUtils.valueOf(""); private final ShowPipePluginsStatement showPipePluginsStatement; @@ -103,6 +104,12 @@ public static void buildTsBlock( pipePluginMeta.getJarName() == null ? PIPE_JAR_NAME_EMPTY_FIELD : BytesUtils.valueOf(pipePluginMeta.getJarName())); + builder + .getColumnBuilder(4) + .writeBinary( + pipePluginMeta.getPluginLoadingExceptionMessage() == null + ? PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD + : BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage())); builder.declarePosition(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 0fb2314a1cfca..1816bf855ffcf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -38,9 +38,20 @@ public class PipePluginMeta { private final boolean isBuiltin; private final String jarName; private final String jarMD5; + private final String pluginLoadingExceptionMessage; public PipePluginMeta( String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) { + this(pluginName, className, isBuiltin, jarName, jarMD5, null); + } + + public PipePluginMeta( + String pluginName, + String className, + boolean isBuiltin, + String jarName, + String jarMD5, + String pluginLoadingExceptionMessage) { this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); this.className = Objects.requireNonNull(className); @@ -52,6 +63,7 @@ public PipePluginMeta( this.jarName = Objects.requireNonNull(jarName); this.jarMD5 = Objects.requireNonNull(jarMD5); } + this.pluginLoadingExceptionMessage = pluginLoadingExceptionMessage; } public PipePluginMeta(String pluginName, String className) { @@ -61,6 +73,7 @@ public PipePluginMeta(String pluginName, String className) { this.isBuiltin = true; this.jarName = null; this.jarMD5 = null; + this.pluginLoadingExceptionMessage = null; } public boolean isBuiltin() { @@ -83,6 +96,10 @@ public String getJarMD5() { return jarMD5; } + public String getPluginLoadingExceptionMessage() { + return pluginLoadingExceptionMessage; + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); @@ -96,6 +113,7 @@ public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -104,7 +122,10 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5); + final String pluginLoadingExceptionMessage = + byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -125,7 +146,8 @@ public boolean equals(Object obj) { && className.equals(that.className) && isBuiltin == that.isBuiltin && Objects.equals(jarName, that.jarName) - && Objects.equals(jarMD5, that.jarMD5); + && Objects.equals(jarMD5, that.jarMD5) + && Objects.equals(pluginLoadingExceptionMessage, that.pluginLoadingExceptionMessage); } @Override @@ -150,6 +172,9 @@ public String toString() { + ", jarMD5='" + jarMD5 + '\'' + + ", pluginLoadingExceptionMessage='" + + pluginLoadingExceptionMessage + + '\'' + '}'; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index f436165f9e8c3..186f7daa6846d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -579,7 +579,8 @@ private ColumnHeaderConstant() { new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT), new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT), new ColumnHeader(CLASS_NAME, TSDataType.TEXT), - new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT)); + new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT), + new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT)); public static final List showSchemaTemplateHeaders = ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 98bf4a9a83023..75b71efc75d0c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -230,6 +230,9 @@ public class InformationSchema { new AttributeColumnSchema(ColumnHeaderConstant.CLASS_NAME_TABLE_MODEL, TSDataType.STRING)); pipePluginTable.addColumnSchema( new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_JAR_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.EXCEPTION_MESSAGE_TABLE_MODEL, TSDataType.STRING)); schemaTables.put(PIPE_PLUGINS, pipePluginTable); final TsTable topicTable = new TsTable(TOPICS); From 125b762f7f17803e27f6582e839ee5961a5d8e66 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 18:25:43 +0800 Subject: [PATCH 02/13] spotless --- .../persistence/pipe/PipePluginInfo.java | 178 +++++++++++++----- .../pipe/plugin/DropPipePluginProcedure.java | 19 +- .../agent/plugin/PipeDataNodePluginAgent.java | 25 +++ .../pipe/agent/runtime/PipeAgentLauncher.java | 20 +- .../constructor/PipePluginConstructor.java | 8 + .../plugin/meta/PipePluginMetaKeeper.java | 11 +- 6 files changed, 200 insertions(+), 61 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 4d178b3321b77..3faeb4fdda57e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -111,6 +112,15 @@ public boolean validateBeforeCreatingPipePlugin( final String pluginName, final boolean isSetIfNotExistsCondition) { // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + final PipePluginMeta existedPipePluginMeta = + pipePluginMetaKeeper.getPipePluginMeta(pluginName); + final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage(); + if (loadingFailureMessage != null) { + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s", + pluginName, loadingFailureMessage)); + } if (isSetIfNotExistsCondition) { return true; } @@ -177,6 +187,7 @@ public void checkPipePluginExistence( LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "extractor"); final PipeParameters processorParameters = new PipeParameters(processorAttributes); final String processorPluginName = @@ -190,6 +201,7 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor"); final PipeParameters sinkParameters = new PipeParameters(sinkAttributes); final String sinkPluginName = @@ -204,22 +216,52 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "connector"); } /////////////////////////////// Pipe Plugin Management /////////////////////////////// public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) { + boolean shouldRecordLoadingFailure = false; + final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); try { - final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); final String jarName = pipePluginMeta.getJarName(); if (createPipePluginPlan.getJarFile() != null) { + shouldRecordLoadingFailure = true; savePipePluginWithRollback(createPipePluginPlan); } else { final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName); if (Objects.nonNull(existed)) { + final PipePluginMeta existedPipePluginMeta = + pipePluginMetaKeeper.getPipePluginMeta(existed); + final String existedLoadingFailureMessage = + existedPipePluginMeta.getPluginLoadingExceptionMessage(); + if (existedLoadingFailureMessage != null) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + existedLoadingFailureMessage)); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s", + pluginName, existed, existedLoadingFailureMessage)); + } + if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) { + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.", + pluginName, existed, jarName)); + } + shouldRecordLoadingFailure = true; pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName); computeFromPluginClass(pluginName, className); } else { @@ -237,7 +279,10 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } catch (final Exception e) { + } catch (final Throwable e) { + if (shouldRecordLoadingFailure) { + savePipePluginLoadingFailure(pipePluginMeta, e); + } final String errorMessage = String.format( "Failed to execute createPipePlugin(%s) on config nodes, because of %s", @@ -249,7 +294,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan } private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan) - throws Exception { + throws Throwable { final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); @@ -258,7 +303,7 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu pipePluginExecutableManager.savePluginToInstallDir( ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName); computeFromPluginClass(pluginName, className); - } catch (final Exception e) { + } catch (final Throwable e) { // We need to rollback if the creation has failed pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName); throw e; @@ -266,7 +311,7 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu } private void computeFromPluginClass(final String pluginName, final String className) - throws Exception { + throws Throwable { final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); final PipePluginClassLoader pipePluginClassLoader = classLoaderManager.createPipePluginClassLoader(pluginDirPath); @@ -275,7 +320,7 @@ private void computeFromPluginClass(final String pluginName, final String classN pipePluginMetaKeeper.addPipePluginVisibility( pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (final Exception e) { + } catch (final Throwable e) { try { pipePluginClassLoader.close(); } catch (final Exception ignored) { @@ -402,49 +447,7 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { if (pipePluginMeta.isBuiltin()) { continue; } - final String pluginName = pipePluginMeta.getPluginName(); - try { - final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); - final PipePluginClassLoader pipePluginClassLoader = - classLoaderManager.createPipePluginClassLoader(pluginDirPath); - try { - final Class pluginClass = - Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); - pipePluginMetaKeeper.addPipePluginVisibility( - pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - null)); - classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (final Throwable e) { - try { - pipePluginClassLoader.close(); - } catch (final Exception ignored) { - } - throw e; - } - } catch (final Throwable e) { - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - getRootCauseMessage(e))); - LOGGER.warn( - "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", - pluginName, - snapshotFile.getAbsolutePath(), - e); - } + createPipePluginOnStartup(pipePluginMeta, snapshotFile); } } finally { releasePipePluginInfoLock(); @@ -460,6 +463,83 @@ private String getRootCauseMessage(final Throwable throwable) { return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); } + private void savePipePluginLoadingFailure( + final PipePluginMeta pipePluginMeta, final Throwable throwable) { + final String pluginName = pipePluginMeta.getPluginName(); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(throwable))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + } + + private void checkPipePluginAvailabilityForPipeCreation( + final String pluginName, final String pluginType) { + final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName); + final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage(); + if (loadingFailureMessage != null) { + final String exceptionMessage = + String.format( + "Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s", + pluginType, pluginName, loadingFailureMessage); + LOGGER.warn(exceptionMessage); + throw new PipeException(exceptionMessage); + } + } + + private void createPipePluginOnStartup( + final PipePluginMeta pipePluginMeta, final File snapshotFile) { + final String pluginName = pipePluginMeta.getPluginName(); + try { + final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); + final PipePluginClassLoader pipePluginClassLoader = + classLoaderManager.createPipePluginClassLoader(pluginDirPath); + try { + final Class pluginClass = + Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); + pipePluginMetaKeeper.addPipePluginVisibility( + pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null)); + classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); + } catch (final Throwable e) { + try { + pipePluginClassLoader.close(); + } catch (final Exception ignored) { + } + throw e; + } + } catch (final Throwable e) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(e))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + LOGGER.warn( + "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", + pluginName, + snapshotFile.getAbsolutePath(), + e); + } + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index efbe1ee6ccdda..ab48a2478506c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.plugin; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator; import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator; @@ -35,7 +36,6 @@ import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,6 +45,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -162,8 +163,8 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) { private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName); - if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, true)).getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final List dropStatusList = env.dropPipePluginOnDataNodes(pluginName, true); + if (dropStatusList.stream().allMatch(this::isDropPipePluginSuccessOrNotExists)) { setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES); return Flow.HAS_MORE_STATE; } @@ -172,6 +173,18 @@ private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) { String.format("Failed to drop pipe plugin %s on data nodes", pluginName)); } + private boolean isDropPipePluginSuccessOrNotExists(final TSStatus status) { + if (status == null) { + return false; + } + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return true; + } + final String message = status.getMessage(); + return message != null + && (message.contains("does not exist") || message.contains("not been created")); + } + private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java index d57956109d46d..567893c2cbe09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoaderManager; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager; +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent; import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent; @@ -176,6 +177,30 @@ public void doRegister(final PipePluginMeta pipePluginMeta) throws PipeException } } + public void markPluginLoadFailure( + final PipePluginMeta pipePluginMeta, final Throwable throwable) { + final String pluginName = pipePluginMeta.getPluginName(); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(throwable))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + } + + private String getRootCauseMessage(final Throwable throwable) { + Throwable current = throwable; + while (current.getCause() != null && current.getCause() != current) { + current = current.getCause(); + } + final String message = current.getMessage(); + return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); + } + public void deregister(final String pluginName, final boolean needToDeleteJar) throws PipeException { lock.lock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index 5a408a319e7f0..286c8a5eaebfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -83,16 +83,20 @@ public static synchronized void launchPipePluginAgent( } // create instances of pipe plugins and do registration - try { - for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { - if (meta.isBuiltin()) { - continue; - } + for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { + if (meta.isBuiltin()) { + continue; + } + try { PipeDataNodeAgent.plugin().doRegister(meta); + } catch (Throwable e) { + PipeDataNodeAgent.plugin().markPluginLoadFailure(meta, e); + // Ignore a single broken plugin and continue startup. + LOGGER.warn( + "Failure when register pipe plugin {}. Skip this plugin and continue startup.", + meta.getPluginName(), + e); } - } catch (Throwable e) { - // Ignore the pipe plugin errors and continue to start - LOGGER.warn("Failure when register pipe plugins, will ignore.", e); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java index cbb25340f0099..6e87444c2d935 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java @@ -81,6 +81,14 @@ private PipePlugin reflect(String pluginName) { LOGGER.warn(errorMessage); throw new PipeException(errorMessage); } + if (information.getPluginLoadingExceptionMessage() != null) { + final String errorMessage = + String.format( + "Failed to reflect PipePlugin instance, because PipePlugin %s failed to load: %s", + pluginName.toUpperCase(), information.getPluginLoadingExceptionMessage()); + LOGGER.warn(errorMessage); + throw new PipeException(errorMessage); + } try { final Class pluginClass = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 6f4ee1caa2f95..43fb0c5909d32 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,7 +125,16 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); + ReadWriteIOUtils.write( + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null) + .serialize(), + outputStream); } } From edd76d5bfaed804b205249fafdbdf95cb97c027a Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 18:57:37 +0800 Subject: [PATCH 03/13] spotless --- .../commons/pipe/agent/plugin/meta/PipePluginMeta.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 1816bf855ffcf..aa91eb7df5a12 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -113,7 +113,6 @@ public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); - ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -122,10 +121,8 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - final String pluginLoadingExceptionMessage = - byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -146,8 +143,7 @@ public boolean equals(Object obj) { && className.equals(that.className) && isBuiltin == that.isBuiltin && Objects.equals(jarName, that.jarName) - && Objects.equals(jarMD5, that.jarMD5) - && Objects.equals(pluginLoadingExceptionMessage, that.pluginLoadingExceptionMessage); + && Objects.equals(jarMD5, that.jarMD5); } @Override From 6015e477e2eee1b792c25896efab35d77d627b5c Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 19:06:54 +0800 Subject: [PATCH 04/13] fix --- .../iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index aa91eb7df5a12..2c0ab82d026ca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -121,8 +121,7 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, null); + return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { From 178ec5376e41b6f44788733e710091a6a4ad2e6e Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 10:01:46 +0800 Subject: [PATCH 05/13] fix --- .../iotdb/confignode/persistence/pipe/PipePluginInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 3faeb4fdda57e..0eb10298ba1a1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -187,7 +187,7 @@ public void checkPipePluginExistence( LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } - checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "extractor"); + checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source"); final PipeParameters processorParameters = new PipeParameters(processorAttributes); final String processorPluginName = @@ -216,7 +216,7 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } - checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "connector"); + checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink"); } /////////////////////////////// Pipe Plugin Management /////////////////////////////// From 75d4b869767a6b209e41c9c2a15707096e95d727 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 10:50:42 +0800 Subject: [PATCH 06/13] fix --- .../relational/it/schema/IoTDBDatabaseIT.java | 7 ++++--- .../persistence/pipe/PipePluginInfo.java | 18 ------------------ 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index 398830efaec56..3ff36d0cad9b2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -490,7 +490,8 @@ public void testInformationSchema() throws SQLException { "plugin_name,STRING,TAG,", "plugin_type,STRING,ATTRIBUTE,", "class_name,STRING,ATTRIBUTE,", - "plugin_jar,STRING,ATTRIBUTE,"))); + "plugin_jar,STRING,ATTRIBUTE,", + "exception_message,STRING,ATTRIBUTE,"))); TestUtils.assertResultSetEqual( statement.executeQuery("desc topics"), "ColumnName,DataType,Category,", @@ -708,9 +709,9 @@ public void testInformationSchema() throws SQLException { TestUtils.assertResultSetEqual( statement.executeQuery( "select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"), - "plugin_name,plugin_type,class_name,plugin_jar,", + "plugin_name,plugin_type,class_name,plugin_jar,exception_message,", Collections.singleton( - "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,")); + "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,")); TestUtils.assertResultSetEqual( statement.executeQuery("select * from views"), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 0eb10298ba1a1..9a05351d08337 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -280,9 +280,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (final Throwable e) { - if (shouldRecordLoadingFailure) { - savePipePluginLoadingFailure(pipePluginMeta, e); - } final String errorMessage = String.format( "Failed to execute createPipePlugin(%s) on config nodes, because of %s", @@ -463,21 +460,6 @@ private String getRootCauseMessage(final Throwable throwable) { return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); } - private void savePipePluginLoadingFailure( - final PipePluginMeta pipePluginMeta, final Throwable throwable) { - final String pluginName = pipePluginMeta.getPluginName(); - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - getRootCauseMessage(throwable))); - pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); - } - private void checkPipePluginAvailabilityForPipeCreation( final String pluginName, final String pluginType) { final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName); From dd7cbc88a9c3018a357d06155b9933822ea370f1 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:27:42 +0800 Subject: [PATCH 07/13] fix --- .../persistence/pipe/PipePluginInfo.java | 13 --------- .../agent/plugin/meta/PipePluginMeta.java | 27 +++++++++++++++++-- .../plugin/meta/PipePluginMetaKeeper.java | 11 +------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 9a05351d08337..30771e78e4696 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -222,7 +222,6 @@ public void checkPipePluginExistence( /////////////////////////////// Pipe Plugin Management /////////////////////////////// public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) { - boolean shouldRecordLoadingFailure = false; final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); try { final String pluginName = pipePluginMeta.getPluginName(); @@ -230,7 +229,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan final String jarName = pipePluginMeta.getJarName(); if (createPipePluginPlan.getJarFile() != null) { - shouldRecordLoadingFailure = true; savePipePluginWithRollback(createPipePluginPlan); } else { final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName); @@ -240,16 +238,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan final String existedLoadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage(); if (existedLoadingFailureMessage != null) { - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - existedLoadingFailureMessage)); - pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); throw new PipeException( String.format( "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s", @@ -261,7 +249,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.", pluginName, existed, jarName)); } - shouldRecordLoadingFailure = true; pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName); computeFromPluginClass(pluginName, className); } else { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 2c0ab82d026ca..31fb17b309481 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -101,18 +101,38 @@ public String getPluginLoadingExceptionMessage() { } public ByteBuffer serialize() throws IOException { + return serialize(true); + } + + public ByteBuffer serializeForState() throws IOException { + return serialize(false); + } + + private ByteBuffer serialize(final boolean includeExceptionMessage) throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream); + serialize(outputStream, includeExceptionMessage); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } public void serialize(DataOutputStream outputStream) throws IOException { + serialize(outputStream, true); + } + + public void serializeForState(DataOutputStream outputStream) throws IOException { + serialize(outputStream, false); + } + + private void serialize(DataOutputStream outputStream, boolean includeExceptionMessage) + throws IOException { ReadWriteIOUtils.write(pluginName, outputStream); ReadWriteIOUtils.write(className, outputStream); ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); + if (includeExceptionMessage) { + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); + } } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -121,7 +141,10 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); + final String pluginLoadingExceptionMessage = + byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 43fb0c5909d32..90a42d2376993 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,16 +125,7 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write( - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - null) - .serialize(), - outputStream); + ReadWriteIOUtils.write(pipePluginMeta.serializeForState(), outputStream); } } From 497ed8cf8c28a0138675ff7f67e4360bcdce9874 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:43:27 +0800 Subject: [PATCH 08/13] fix --- .../pipe/plugin/PipePluginTableResp.java | 2 +- ...formationSchemaContentSupplierFactory.java | 2 +- .../config/metadata/ShowPipePluginsTask.java | 3 +- .../agent/plugin/meta/PipePluginMeta.java | 46 +++++++++---------- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java index 534b824ef47df..93642f50312b9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java @@ -51,7 +51,7 @@ public PipePluginTableResp( public TGetPipePluginTableResp convertToThriftResponse() throws IOException { final List pipePluginInformationByteBuffers = new ArrayList<>(); for (PipePluginMeta pipePluginMeta : allPipePluginMeta) { - pipePluginInformationByteBuffers.add(pipePluginMeta.serialize()); + pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin()); } return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index a5c95db9ede79..5b51c00942bb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -685,7 +685,7 @@ private PipePluginSupplier(final List dataTypes, final UserEntity en ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { iterator = client.getPipePluginTable().getAllPipePluginMeta().stream() - .map(PipePluginMeta::deserialize) + .map(PipePluginMeta::deserializeForShowPipePlugin) .filter( pipePluginMeta -> !BuiltinPipePlugin.SHOW_PIPE_PLUGINS_BLACKLIST.contains( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java index cd6edab749421..bffc9fe43d9fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java @@ -75,7 +75,8 @@ public static void buildTsBlock( final List pipePluginMetaList = new ArrayList<>(); if (allPipePluginsInformation != null) { for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) { - pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer)); + pipePluginMetaList.add( + PipePluginMeta.deserializeForShowPipePlugin(pipePluginInformationByteBuffer)); } } pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 31fb17b309481..19f6863be3ca3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -101,38 +101,30 @@ public String getPluginLoadingExceptionMessage() { } public ByteBuffer serialize() throws IOException { - return serialize(true); - } - - public ByteBuffer serializeForState() throws IOException { - return serialize(false); + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + serialize(outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } - private ByteBuffer serialize(final boolean includeExceptionMessage) throws IOException { + public ByteBuffer serializeForShowPipePlugin() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream, includeExceptionMessage); + serializeForShowPipePlugin(outputStream); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } public void serialize(DataOutputStream outputStream) throws IOException { - serialize(outputStream, true); - } - - public void serializeForState(DataOutputStream outputStream) throws IOException { - serialize(outputStream, false); - } - - private void serialize(DataOutputStream outputStream, boolean includeExceptionMessage) - throws IOException { ReadWriteIOUtils.write(pluginName, outputStream); ReadWriteIOUtils.write(className, outputStream); ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); - if (includeExceptionMessage) { - ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); - } + } + + public void serializeForShowPipePlugin(DataOutputStream outputStream) throws IOException { + serialize(outputStream); + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -141,10 +133,7 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - final String pluginLoadingExceptionMessage = - byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; - return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -152,6 +141,17 @@ public static PipePluginMeta deserialize(InputStream inputStream) throws IOExcep ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream))); } + public static PipePluginMeta deserializeForShowPipePlugin(ByteBuffer byteBuffer) { + final String pluginName = ReadWriteIOUtils.readString(byteBuffer); + final String className = ReadWriteIOUtils.readString(byteBuffer); + final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); + final String jarName = ReadWriteIOUtils.readString(byteBuffer); + final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); + final String pluginLoadingExceptionMessage = ReadWriteIOUtils.readString(byteBuffer); + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + } + @Override public boolean equals(Object obj) { if (this == obj) { From e84c7ffd03b436c34c63a4bfeeb22c42cfb48cbf Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:50:08 +0800 Subject: [PATCH 09/13] fix --- .../commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 90a42d2376993..6f4ee1caa2f95 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,7 +125,7 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write(pipePluginMeta.serializeForState(), outputStream); + ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); } } From f8ace87381b210461d055457c6aea9f827958b61 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 12:16:14 +0800 Subject: [PATCH 10/13] fix --- .../consensus/response/pipe/PipePluginTableRespTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java index 64551f7920347..146a6e14d0bf7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java @@ -49,7 +49,7 @@ public void testConvertToThriftResponse() throws IOException { final List pipePluginByteBuffers = new ArrayList<>(); for (PipePluginMeta pipePluginMeta : pipePluginMetaList) { - pipePluginByteBuffers.add(pipePluginMeta.serialize()); + pipePluginByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin()); } TGetPipePluginTableResp getPipePluginTableResp = new TGetPipePluginTableResp(status, pipePluginByteBuffers); From 7a610fade023ed811632213f831b22dde60ff048 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 17 Apr 2026 10:00:35 +0800 Subject: [PATCH 11/13] fix --- .../auto/basic/IoTDBPipeSyntaxIT.java | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index 87bc7b0e95bd2..0c92c1d0b7916 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; @@ -38,8 +39,19 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -47,6 +59,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; import static org.junit.Assert.fail; @@ -884,4 +899,152 @@ public void testPipePluginValidation() { fail(e.getMessage()); } } + + @Test + public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exception { + final String pluginName = "TEST_MISSING_JAR_PROCESSOR"; + final String pluginClassName = "org.apache.iotdb.pipe.it.plugin.TestMissingJarProcessor"; + final Path temporaryDir = Files.createTempDirectory("pipe-plugin-it-"); + final Path pluginJarPath = temporaryDir.resolve("test-missing-jar-processor.jar"); + + buildTestPipePluginJar(temporaryDir, pluginJarPath, pluginClassName); + + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipePlugin %s as '%s' USING URI '%s'", + pluginName, pluginClassName, pluginJarPath.toUri())); + } + + senderEnv.shutdownAllDataNodes(); + senderEnv.shutdownAllConfigNodes(); + + deletePluginJarUnderDataNodes("test-missing-jar-processor.jar"); + + senderEnv.startAllConfigNodes(); + senderEnv.startAllDataNodes(); + ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown(); + + boolean pluginFound = false; + boolean exceptionMessageFound = false; + SQLException lastException = null; + for (int retry = 0; retry < 10; retry++) { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement(); + final ResultSet resultSet = statement.executeQuery("show pipe plugins")) { + while (resultSet.next()) { + if (pluginName.equalsIgnoreCase(resultSet.getString("PluginName"))) { + pluginFound = true; + final String exceptionMessage = resultSet.getString("ExceptionMessage"); + exceptionMessageFound = exceptionMessage != null && !exceptionMessage.trim().isEmpty(); + break; + } + } + lastException = null; + break; + } catch (final SQLException e) { + lastException = e; + Thread.sleep(1000); + } + } + if (lastException != null) { + throw lastException; + } + + Assert.assertTrue("Expected plugin in show pipe plugins result.", pluginFound); + Assert.assertTrue( + "Expected non-empty ExceptionMessage after deleting plugin jar and restarting cluster.", + exceptionMessageFound); + + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(String.format("drop pipePlugin %s", pluginName)); + } finally { + Files.deleteIfExists(pluginJarPath); + Files.deleteIfExists(temporaryDir); + } + } + + private void buildTestPipePluginJar( + final Path tempDir, final Path jarPath, final String pluginClassName) throws IOException { + final int lastDot = pluginClassName.lastIndexOf('.'); + final String packageName = pluginClassName.substring(0, lastDot); + final String simpleClassName = pluginClassName.substring(lastDot + 1); + final Path packageDir = tempDir.resolve(packageName.replace('.', File.separatorChar)); + Files.createDirectories(packageDir); + + final Path sourcePath = packageDir.resolve(simpleClassName + ".java"); + final String sourceCode = + "package " + + packageName + + ";\n" + + "import org.apache.iotdb.pipe.api.PipeProcessor;\n" + + "import org.apache.iotdb.pipe.api.collector.EventCollector;\n" + + "import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;\n" + + "import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;\n" + + "import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;\n" + + "import org.apache.iotdb.pipe.api.event.Event;\n" + + "import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;\n" + + "import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;\n" + + "import java.io.IOException;\n" + + "public class " + + simpleClassName + + " implements PipeProcessor {\n" + + " @Override public void validate(PipeParameterValidator validator) {}\n" + + " @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {}\n" + + " @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws IOException { eventCollector.collect(tabletInsertionEvent); }\n" + + " @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws IOException { eventCollector.collect(tsFileInsertionEvent); }\n" + + " @Override public void process(Event event, EventCollector eventCollector) throws IOException { eventCollector.collect(event); }\n" + + " @Override public void close() {}\n" + + "}\n"; + Files.write(sourcePath, sourceCode.getBytes(StandardCharsets.UTF_8)); + + final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + Objects.requireNonNull(compiler, "JDK compiler is required for this test."); + + final int compileStatus = + compiler.run( + null, + null, + null, + "-cp", + System.getProperty("java.class.path"), + "-d", + tempDir.toString(), + sourcePath.toString()); + Assert.assertEquals("Failed to compile test pipe plugin class.", 0, compileStatus); + + final String classEntry = pluginClassName.replace('.', '/') + ".class"; + final Path classPath = tempDir.resolve(Paths.get(classEntry)); + try (final OutputStream fos = new FileOutputStream(jarPath.toFile()); + final JarOutputStream jos = new JarOutputStream(fos)) { + final JarEntry jarEntry = new JarEntry(classEntry); + jos.putNextEntry(jarEntry); + jos.write(Files.readAllBytes(classPath)); + jos.closeEntry(); + } + } + + private void deletePluginJarUnderDataNodes(final String targetJarFileName) throws IOException { + for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) { + final Path extPipePath = Paths.get(dataNodeWrapper.getNodePath(), "extPipe"); + if (!Files.exists(extPipePath)) { + continue; + } + try (final java.util.stream.Stream paths = Files.walk(extPipePath)) { + paths + .filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().equals(targetJarFileName)) + .forEach( + path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + } } From 8e9567f005cc42019269c5a619943c2786bbc5ca Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 17 Apr 2026 11:13:34 +0800 Subject: [PATCH 12/13] spotless --- .../auto/basic/IoTDBPipeSyntaxIT.java | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index 0c92c1d0b7916..80e025e2e8bbb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.it.env.cluster.env.AbstractEnv; +import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; @@ -56,6 +57,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -920,7 +922,7 @@ public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exceptio senderEnv.shutdownAllDataNodes(); senderEnv.shutdownAllConfigNodes(); - deletePluginJarUnderDataNodes("test-missing-jar-processor.jar"); + deletePluginJarUnderConfigNodes(pluginName); senderEnv.startAllConfigNodes(); senderEnv.startAllDataNodes(); @@ -932,7 +934,7 @@ public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exceptio for (int retry = 0; retry < 10; retry++) { try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement(); - final ResultSet resultSet = statement.executeQuery("show pipe plugins")) { + final ResultSet resultSet = statement.executeQuery("show pipeplugins")) { while (resultSet.next()) { if (pluginName.equalsIgnoreCase(resultSet.getString("PluginName"))) { pluginFound = true; @@ -961,8 +963,7 @@ public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exceptio final Statement statement = connection.createStatement()) { statement.execute(String.format("drop pipePlugin %s", pluginName)); } finally { - Files.deleteIfExists(pluginJarPath); - Files.deleteIfExists(temporaryDir); + cleanupTemporaryDirectory(temporaryDir); } } @@ -1026,25 +1027,45 @@ private void buildTestPipePluginJar( } } - private void deletePluginJarUnderDataNodes(final String targetJarFileName) throws IOException { - for (final DataNodeWrapper dataNodeWrapper : senderEnv.getDataNodeWrapperList()) { - final Path extPipePath = Paths.get(dataNodeWrapper.getNodePath(), "extPipe"); - if (!Files.exists(extPipePath)) { + private void deletePluginJarUnderConfigNodes(final String pluginName) throws IOException { + for (final ConfigNodeWrapper configNodeWrapper : senderEnv.getConfigNodeWrapperList()) { + final Path pluginJarDirPath = + Paths.get( + configNodeWrapper.getNodePath(), "ext", "pipe", "install", pluginName.toUpperCase()); + if (!Files.exists(pluginJarDirPath)) { continue; } - try (final java.util.stream.Stream paths = Files.walk(extPipePath)) { - paths - .filter(Files::isRegularFile) - .filter(path -> path.getFileName().toString().equals(targetJarFileName)) + try (final java.util.stream.Stream children = Files.walk(pluginJarDirPath)) { + children + .filter(path -> !path.equals(pluginJarDirPath)) + .sorted(Comparator.reverseOrder()) .forEach( path -> { try { Files.deleteIfExists(path); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } }); } } } + + private void cleanupTemporaryDirectory(final Path temporaryDir) throws IOException { + if (!Files.exists(temporaryDir)) { + return; + } + try (final java.util.stream.Stream paths = Files.walk(temporaryDir)) { + paths + .sorted(Comparator.reverseOrder()) + .forEach( + path -> { + try { + Files.deleteIfExists(path); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + } + } } From 2c011c358df6f48976f2b49974f6eccba02a668a Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 17 Apr 2026 15:57:17 +0800 Subject: [PATCH 13/13] spotless --- .../auto/basic/IoTDBPipeSyntaxIT.java | 104 ++---------------- .../pipe-count-point-processor-example.jar | Bin 0 -> 9446 bytes 2 files changed, 10 insertions(+), 94 deletions(-) create mode 100644 integration-test/src/test/resources/pipe-count-point-processor-example.jar diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index 80e025e2e8bbb..0d637596951e2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -40,14 +40,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; - import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -61,9 +55,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.jar.JarEntry; -import java.util.jar.JarOutputStream; import static org.junit.Assert.fail; @@ -905,11 +896,16 @@ public void testPipePluginValidation() { @Test public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exception { final String pluginName = "TEST_MISSING_JAR_PROCESSOR"; - final String pluginClassName = "org.apache.iotdb.pipe.it.plugin.TestMissingJarProcessor"; - final Path temporaryDir = Files.createTempDirectory("pipe-plugin-it-"); - final Path pluginJarPath = temporaryDir.resolve("test-missing-jar-processor.jar"); - - buildTestPipePluginJar(temporaryDir, pluginJarPath, pluginClassName); + final String pluginClassName = "org.apache.iotdb.CountPointProcessor"; + final Path pluginJarPath = + Paths.get( + System.getProperty("user.dir"), + "src", + "test", + "resources", + "pipe-count-point-processor-example.jar") + .toAbsolutePath(); + System.out.println(pluginJarPath.toUri()); try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { @@ -962,68 +958,6 @@ public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exceptio try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { statement.execute(String.format("drop pipePlugin %s", pluginName)); - } finally { - cleanupTemporaryDirectory(temporaryDir); - } - } - - private void buildTestPipePluginJar( - final Path tempDir, final Path jarPath, final String pluginClassName) throws IOException { - final int lastDot = pluginClassName.lastIndexOf('.'); - final String packageName = pluginClassName.substring(0, lastDot); - final String simpleClassName = pluginClassName.substring(lastDot + 1); - final Path packageDir = tempDir.resolve(packageName.replace('.', File.separatorChar)); - Files.createDirectories(packageDir); - - final Path sourcePath = packageDir.resolve(simpleClassName + ".java"); - final String sourceCode = - "package " - + packageName - + ";\n" - + "import org.apache.iotdb.pipe.api.PipeProcessor;\n" - + "import org.apache.iotdb.pipe.api.collector.EventCollector;\n" - + "import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;\n" - + "import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;\n" - + "import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;\n" - + "import org.apache.iotdb.pipe.api.event.Event;\n" - + "import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;\n" - + "import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;\n" - + "import java.io.IOException;\n" - + "public class " - + simpleClassName - + " implements PipeProcessor {\n" - + " @Override public void validate(PipeParameterValidator validator) {}\n" - + " @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {}\n" - + " @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws IOException { eventCollector.collect(tabletInsertionEvent); }\n" - + " @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws IOException { eventCollector.collect(tsFileInsertionEvent); }\n" - + " @Override public void process(Event event, EventCollector eventCollector) throws IOException { eventCollector.collect(event); }\n" - + " @Override public void close() {}\n" - + "}\n"; - Files.write(sourcePath, sourceCode.getBytes(StandardCharsets.UTF_8)); - - final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - Objects.requireNonNull(compiler, "JDK compiler is required for this test."); - - final int compileStatus = - compiler.run( - null, - null, - null, - "-cp", - System.getProperty("java.class.path"), - "-d", - tempDir.toString(), - sourcePath.toString()); - Assert.assertEquals("Failed to compile test pipe plugin class.", 0, compileStatus); - - final String classEntry = pluginClassName.replace('.', '/') + ".class"; - final Path classPath = tempDir.resolve(Paths.get(classEntry)); - try (final OutputStream fos = new FileOutputStream(jarPath.toFile()); - final JarOutputStream jos = new JarOutputStream(fos)) { - final JarEntry jarEntry = new JarEntry(classEntry); - jos.putNextEntry(jarEntry); - jos.write(Files.readAllBytes(classPath)); - jos.closeEntry(); } } @@ -1050,22 +984,4 @@ private void deletePluginJarUnderConfigNodes(final String pluginName) throws IOE } } } - - private void cleanupTemporaryDirectory(final Path temporaryDir) throws IOException { - if (!Files.exists(temporaryDir)) { - return; - } - try (final java.util.stream.Stream paths = Files.walk(temporaryDir)) { - paths - .sorted(Comparator.reverseOrder()) - .forEach( - path -> { - try { - Files.deleteIfExists(path); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - } - } } diff --git a/integration-test/src/test/resources/pipe-count-point-processor-example.jar b/integration-test/src/test/resources/pipe-count-point-processor-example.jar new file mode 100644 index 0000000000000000000000000000000000000000..5db75914bf9bd993cf58acec99fa5c2e7491d2cd GIT binary patch literal 9446 zcmb_?1y~$g)@=hpx^Z`RcXxMpcMa|q+@0XA0YcCO8VL|wg9i%^A$V{L_Q-$lOmcH) z=FR)wfBLKbs=N2vd)Kb&Q+3wbD)LazKmY(705DQ-qYe0F0H5|1B{f7DWtF6uRi4Z+ zp3LC>$83-oZfD@>=STtf^aR3 zHN_!%_9sNyuaUsAvg(g*?L$QAue^0GPe&2XM!7%av3*)w>jN%}%*tFV2=VCiXb?3H z7NB|VMZka;P-8p=Qi63~Qqfx9$W&wLzreIi^x(;L!{WEA&=RtUj`5F0;{tylreq+l z+}|`%A$B*F-Ylxam|4J!>}s{`M%Ki8UB+KhouXoh^44gkC>4C-wS}TZY7n@{v)Efp zi1!#+A=~RzX~8u7LMwG&{G$JgTWrke$ASOr`aF9&a3@#m|0&JCwt)ZB!qnN++{W@h z7^42w(ALS_!t6hoBmY`*2UAZ=$NykV@^|B>teO7EiRu3qO8mb-JKH*2GMYPiIJz@B zJK6qtcXcwibaQiZWwi7*b#S)-Gp(tq5gW^H#BTSI6PI~hyegV`89C`S#e1v zb;)HteYeGa%z&%fdDk*^TbQyu4qN=~==v`a)#A1iEsj(vk(6~LO)zzXOe8V)7lIk> z!KD^n-?bFWIAsqO-ff@e9p(1HpR|0CWFE;wJMwtVS8gP5_C+|={GFouX$+ z1MJDg{@kiSiw!rzeU5hRe?e0_sge|$k&L#YdfYIjcR_uAXmqS?&s`h)Y-tM z6h(49N`BT(Sy)SbY76b?DR<0Tw8nZf_evD4i&+ixXk%<63?Y46Zk=}A6F<_t3TJuE z=uy!s%^|T4$eY=?#k^~!OgteQ@3B<#H?8YB%;w4%^ooIXzVGqF7QBW*DnP8gb&@YU z2XQlxW%q5lZuAj(1kdbS8_}>l^?e5yN!3LhEXiwR&!f@IBVj2H(~>AYr4$+ zolr)i)dWt6FoadMAku0OF`im$=k!{WxpvpvsfOy^Hj2l>`F+y?SMQz{Z)=9#$wV+W z3hPki;md5Kp7EH$v12wU3pY?YR6tm(>be@hwDaeaAl#B9qC z_0_ZoR&;X>rjGABV96p49!>O%l!r-}S|~mB6uV%`NFdjyfYJ?!H5_VU6 z;n{G`-~37y*MG_W0lLV!vh@Jzjo<`X2x!!gC(hyPu5)e{#lGk4?BB#IT4Lddouk^ih=THS{+wlz@W1sWRnRy_` z!2Q7*KVoLa7$L;X#fj zFy1`f`1&BCSqm**1RecBF;RWy4M?OyBFn{|u{xAO!tPw`jg;69?Yc8^3U%|#j3JQr z@E&a4MktyM3O9cOU}>`r@ou-B0xYWHPfo~JTcT0rj2ssbMbuBX6tfiGQn$M6nh^}a z^E?O-kYzg)N~LXv5o+t74+o9K#pMr)x-#m+X6 zJB&mqrJ0m-=HB|7whNZ*_9L0LT0E^rwSha0_67`E8*DzQH&u2>NwCY*Y)df3(3_RgM=coWnH6|Yp0F}O~E zea>51GD#NmTn(UrK)D+-`4|Y*)y`SPAO~(o0Kb6uvlY=v$?OzgAxsg&V1p;Yj)sgvZ;{BwS+6 zc}NKLkLeqmV11F&9#XL>?|NlHdi#E3K@FpxeiiCLo)3a-4+fiCgjQ5f>n$0kD=c@# z>=p_Vr@8SP$-g5=!_<&X#+4xn3mWU)h{6W&hN(mL z?9Twwf$Ojna~gi}1|{sv@~3T~=$iu>oxTTH_|$Gmqt4#C`p^X&DDPp_$&v~*+*Tjg zBZ?M7Fq6y$`%i#VlsrcRh*i+^VuhFSVIqmEL1PAXgfBLuC0ClZIE2kuleba`$5Y+9 zu{NhlLdz);1|nJp6Rk5K^VfC^4}CjZ9#`uByxGg z3n0e?*#QD>0wGl62VB4p)F#lKjf?6BJRSsy-8>NqwRJ#xv=PYl2QyMU%cfr^b|AHr{QN1kAfh zG<9fJg}$<692#{SsTXBoQsi5i%s#=IE*_m2l1Je>$xJ9`-A&BAn;E@7J-k48 zdE`RkXa4y{Fv->W_RB(=XlIrJZ#}~eJvyeIGEgWL^CFXW=acR`G8)<3Gb=NJ3RJdZ z(H*$HjCoFUx0P<|L%A*(Hato~u=-8v3bX72s27gw1^TB{(E*Eniy~Q#7Hf}dA=oq7 zq%9voLJ^=Lb#B+o7DdcsjgI=h*J+k(z6+@%S1{I98pM_$RJa|$f-cE~7V}9`JPFbv!#Z0Kew;qys#hO56o;ClP zzylIJ(qDcw;RA5Zh_haV8=IJ~qBtS9jL5V^&HPp#?H&9}o>d-#cZ2qsY4P>>1csDl ziTp&!u9#g2hf`h$}W0OzY)I9u{$hZKq(|%^6|u;)sQdsMMbNqBE=G5svQ$=WC?+ z1WdemX(`uC<*L-7Q(|2q|1mJ#-IW$2V0z<%!wyOdW<|9dqMhDsIymKX%8mVC*&4So z;#evG{Hb7jLyq4r?|2G~MI0O$Vu-{FEUwIf;;#GTV;-Ak64WL$DxV#KIbCsozBi-$38;vYE$+Hlx^_ z_jh^^U-NXT0pez^zK&3Q%{D`YjRO3;!3!w;X3aD6q&MO@W}i?^(G&0 zx9)HEA*(ukTMXLY>(@N^-cMibb)X0vuD0K!HW=L1`G078%o5vKJ`M0_|2hYYWz>JT zfkbe*e!ro#6wv&>UDw#Haci*~#tBR0k%()?eP4u|eF;;k2U@^d(TQ`)jn5fzGCWb` zjni~VcO)KEBJR#c&#;5BR6g(!!rnuET3P*#F(oO`^ zc!rJ-^hALpiJ%UQ|I9=kx_g)sKYn9a4g*{vDKW0Q-J*?&@vByb4F@k)+^znhaqG?d zkZ8!UbNqHvG3j9k7b4Un#_n{ej^MQ&&pZDB7GI@dSR~7k5j;iw+h{*ozy9W?R7MrA zR2=RN{<4p?Rdcp70#klFIQFyzeV&>GiOSbGN za(oZaT8hZLzbc}%L3gf=9(Li=!v=@CL_NFrM%lLPHEpdT;xK$U`*49c2yhBJb`8ZI z$30#)V34Z<+JpzSHJ_1T?lI5`eM@Rpp1!z#d%Um@?i3rzV-8R*-f)eEe^mZJA<;b&hF9g`@ytsO6!UjnqF$z4@daY2PCnsvp&v79_0|M3nL9pA zL5P}%o|G(@Q}qv3fv&oU0nW#Aine@+OO|~>JlB$x-O~%mXNk5-<$SNR!q7v6m z7^Op2jh${e`MNeF#Ww-Lm}HMPbSD(sAIBwS$P-knt*xUy-VP)Ef!Wb%J$XxbwGiG` z4-n;hT+95W_S?Es>9EiWW!LIGHWSU=bBt+QvBDXcxBX)HLSe6^lKF1*-jumQf}Tkg6h64C1;CTNfSPi1_C z*eF1elM34Wyv$#1pdAY+o&aGb9!l5_841tuQqrjncU5l1M74`4%|V z#)>$`rCD#e;(QpioCKptz?2*8%F!%^kZ)R@x#Isl!&pTTPJr-evwf?({W-ySDQRTG z7LrpHFV2+Z1~csSt>&GWZ)m=BQ`Lv5O(B`eD(mbB<>U`2bWF#=*JE2-S2SejRokOb_GgEt_@-@0zvg+0*bQ;u#mxh!0GbF zR(50m+quCR_9Mh!m*y?4a*+v7OY+{Q?JxX@lCs9r;`~KCE7B7i^~~?XhMbKVMyoqY zlms>~u=8!=yxL&i0er-&d6JE-RnCH;2dNvMr*pt({_*)PoR+uc9;i690hxoQW2qX- zcXBQu$itfDP#_#ExTZhNQO#Zq$8J(Tx`F1ELKH4fvT>e>!BU&=$E3iT0obwLovxVg z!Z0}`-FdUGQu_lMBc@856x)4%V-@rU$vJLd0f50LhJ@}Z{U4;#pOn!b^o;lqZbjt> zwW9J5W`)Vz-qg*lQRlfn{s3k`1`yfy2 zuOE4jc+c|Lt#1Ybo!XGr;2wxXq3}34c5S5K#pftHdJ$t6eKrr4D3dR6^%8QxyTbhw zIEN!PHkLasOKn}T0`Ex>rcd15`s>u+-rEk863-2R#@&{*BxDH3w+)lGJ8(|Jo`aYn z2?_B$cXuSP#f#Yb*X#%yUmdjHj62pd6Jih4FrZn7aS@j`J4tA`bF8R3Oy>szIwwRxh$D{O?%UvrkS z%dX%q*OUYkp+{7Na&w2;qPpBI0hD&la?MR-M&Vq>9-1}*1OT3my2wIS~2W!{D20GCo$Pict; zienWaa-nt)1fl9VYwl-088g}Wj%0eB2wU{_)MAc;cVPtv_|&Cv;?k^zmUY9&D(EdhrxQ@vn0^YA5wtA` zxv}}k#z!Vx;EpUNcAHbk&4xb4e1YpxafgZ(C)|sK>y9p5qKO@_$QQ0q?-%Z!*yDfR zuz+4IiiUV(#?yc)lxPK7%EVZrZz&deDE$-{=BqG3KGn?>qDMLrvFWYRW6r0vM$vOW zQyFeZ=Pkv@DONG99g_+k9eRB}Nv>gU$J3&l-NqlI{&KblVZd&pm5A^~791K+wW-Dc z&dXHdeGFVr26>v7DoF0$r7y?&>!}qI2S2IXvQ%W(%e|DR72}CwM_ZRTG+1URNOdlA z*6L|U7+`%@#8F=3-9f{0FqUqRv5~_CZ#LDyFs-iXYk()e@6FBDi-pf4!T`R7tj3lgdJ~6S@Dt3!F`<|S$8UAcD2l;xhVU${ks!_ z;6WBGW4c>9bWNi1*?!ZYZ3Jc3>LKJTm6F7*U+pdd~KuZ=)nbEe!!3iQ{p)@bh#mS?s_x1@~ZgqrS?HWho65{UB9%NWb#!g<##r&HhO zF*LC)ac=SC+Mi-uXzo-a$;Ic#S2fmns3B}FN}i6o%f8Tv_?$lbzHxG~G@~KC57(%f zv)Ksoa`v-Qn#0P+41*fI2*hnZafnxP{06i|xrrF`tzh&fxD=dv2q^KdkDmkxeyow=D!eCoM9D~i99RfQ=#T4HW}Vt=#w=<7*TMWbrtaS)NNzQ*c2qC^hIJd zh?apIMQ^op>3ybGu|qhnw=V5Sv6Z@1#p57X`>j%y9C?_aka>*7lC8`d z?R&EWzYsEhulS{?RF!VJW=Q*uH(&8`QQ4OGJ9UnsvU$ z(ftMM&+=o%EL0kGTQmIQujt+Vz}R%sk5%@x-HO4%td83Z>#AOd4Cyh8+?e=Wl}0yZ zSWqd8M~82W2*0MMk%4<#>*O%1C++9#j%fk{qf|0(TN4U4x0u=#6l6bxog*2)`Go8( z6zt+x)vVcE)Azhsj8*##-94`?h2MEBfak(36s$aTTfQUT4B^00mZ0B0z0E18Z(EJa zcz$b(hkI^uZvV{~;x-TM7U3RTF%;mR9&jN7DCpVV9Xf{gXI=7tS&Cme9Q+Mz*-)Ur zO00So37id;8(0xx`b4_g5ynhEZQP}8t7s>m5-;WJc~xt6Ri)-L0J?0C@Z;JSlrnyi zNYn0w?3sGux%vFMwRR(!{T{>FU|ly@ZVUyOGsQbYpQ5YInEwP+%YU4Dr1OLuX`jnsDG zAQ1G&tjFxFH7&J;-c%QlpJQo>`=rwR*&A~^{p1U0n-}g`=$mE^c}OgC`-}=<lJ9N|76Y39BI_e2Ha zh&QB%hx`|Y1Onm;XsFMMQiL@HOdg9y(@gJmvlO9XNGhzRT0m6wonqP&RI<&iN?^-D z{=w}BOM%-vHlgEQD@tnmicduVSnCt`yO*>2CvVsih%f`$nq%PKInZAZ(s%8?1v!LL zoIST(=e~B}p|`Aw5HTi|;Xd}eLk(1mGx3HB>{_im3jFJ{Pg#rNS_c#WApayo$^9xz z{Gko~P44+G1RG~32PSU^`x!06#LrR~FSfOp^n7GW8x^_I@KkHb-@?btAQo~iawLWd zX38GmdReu>Xx<;S=ht_kRDjjaP>4i2+&+x8Sl0ko9AXLbD>M|2@wXef-GfdgO5?1# z$2MK}Iy_dwL)WP12oV6nR}4i4lNb$v_97N0*%0A+o&9VXkhhi8FU?`OEh0b9HS8`q)rp@*9LH|LoYyJ)9RP=+h79qUO&Z zW36|9z&9`#Og-TNfAA1i4XS%*WHRTeCdup%kgggOKdJ1L{=j zAT!qmLG_^X)`g%?`PdROzb%ElJ)8xT7 z5RGd@-wrqx5y?KCGk2WuBe?@QIdQsCUwoVSj_ zqs`=+Qlq-W_g^ttYxQ=LqwScGfLdXzam@9^g7ztH6OQk@D|vj!ZSPocLg0Y8=*OM% zW)owJ4Ma_Z$81PRJfujA0(8RpkRdln2DWO#7txBtSM{RNbVLiSEKJU_O8yJ7q3q7rRq8r$eSJ1x{76?W0^G9a-z}+X;oCSRcFLa%O@BB2EZxx;+H_gRhWmH z(AePY!MHv4;KMG`I?VE>N}YVL5B1?G^5{3|sfW>W^2=vRNmDO3;@5^)yW4%dSRS-p zatVoO7kq39hBo_*2{UI;uc9R`Q9)jGh?BR`uQ(9>Z~Xxq1>&cMYr6OJboGK4N9v;_x(?et8zueAI*0L=%Q#V#!%zH9qAT^*eWRAV<6-XkRJ;W4GRqf{B@S-wAEtQeVQGN zo;KD0y;~*$ub+@&2!yK+h5@A6BOte2vNik4`#zO;zoj>HeHHL;XEvb932eBw?|18lz zng{+pnB*VWC&Bwrbl~sczujH_Q_+636Z|oLc!P%gTPlC^_Fvqq{s{QpTAF#az|{eM3){2urBY3FC$CF1|JxIZT$6?tfwA0d#QUJOtF(NdBhpZ*Wf CNjV1q literal 0 HcmV?d00001