Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
*/
package org.knime.python3;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -105,8 +107,18 @@ public void registerFromPandasColumnConverter(final String pythonModule, final S
}

private static class DummyPythonGateway implements PythonGateway<DummyEntryPoint> {
private final IOException m_closeException;

private boolean m_isClosed = false;

DummyPythonGateway() {
this(null);
}

DummyPythonGateway(final IOException closeException) {
m_closeException = closeException;
}

@Override
public DummyEntryPoint getEntryPoint() {
return null;
Expand All @@ -125,6 +137,9 @@ public InputStream getStandardErrorStream() {
@Override
public void close() throws IOException {
m_isClosed = true;
if (m_closeException != null) {
throw m_closeException;
}
}

public boolean isClosed() {
Expand Down Expand Up @@ -198,4 +213,29 @@ public void testTrackerClosesOnGateClose() throws IOException {
TRACKER.onPythonGatewayCreationGateClose();
assertTrue(gateway.isClosed());
}

@SuppressWarnings("resource")
@Test
public void testTrackerClosesForCheckpoint() throws IOException {
final var gateway = new DummyPythonGateway();
TRACKER.createTrackedGateway(gateway);
TRACKER.clearForCheckpoint();
assertTrue("Gateway should be closed after clearForCheckpoint", gateway.isClosed());
}

@SuppressWarnings("resource")
@Test
public void testCheckpointCleanupPropagatesIOException() {
final var gateway = new DummyPythonGateway(new IOException("close failed"));
TRACKER.createTrackedGateway(gateway);

final var exception = assertThrows("Expected IOException when gateway close fails during checkpoint",
IOException.class, TRACKER::clearForCheckpoint);

assertTrue("Gateway should be closed even when IOException was thrown", gateway.isClosed());
assertEquals("Exception message should indicate aborting Python process failed",
"Aborting Python process failed.", exception.getMessage());
assertEquals("Original cause message should be preserved",
"close failed", exception.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

import org.knime.core.checkpoint.PhasedInit;
import org.knime.core.checkpoint.PhasedInitSupport;
import org.knime.core.node.NodeLogger;
import org.knime.python3.PythonGatewayCreationGate.PythonGatewayCreationGateListener;

Expand All @@ -79,6 +83,18 @@ public final class PythonGatewayTracker implements PythonGatewayCreationGateList

private PythonGatewayTracker() {
m_openGateways = gatewaySet();
// Support CRaC (Coordinated Restore at Checkpoint) and close all connections prior to checkpointing
PhasedInitSupport.registerOrActivate(new PhasedInit<RuntimeException>() {
@Override
public void beforeCheckpoint() throws RuntimeException {
try {
clearForCheckpoint();
} catch (IOException ex) {
throw new UncheckedIOException(
"Error when forcefully terminating Python processes before checkpointing", ex);
}
}
});
Comment on lines +87 to +97
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are race conditions on checkpoint/restore, which should not happen by design. Restore willl be a controlled phase, and checkpoint will only happen when the init of all bundles has stabilized (but no workflows have been executed). So, leaving unchanged!

}

/**
Expand Down Expand Up @@ -107,15 +123,30 @@ public void onPythonGatewayCreationGateClose() {
}

void clear() throws IOException {
clear(LOGGER::error,
"Found running Python processes (%d). Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.");
}

void clearForCheckpoint() throws IOException {
clear(LOGGER::info,
"Found running Python processes (%d). Aborting them prior to checkpointing. Triggered from thread '%s'.");
}

/**
* Closes all open gateways and logs a message using the provided consumer.
*
* @param logMessageConsumer consumer for logging messages
* @param logMessage message format string with placeholders for gateway count and thread name
* @throws IOException if an error occurs while closing the gateways
*/
private void clear(final Consumer<String> logMessageConsumer, final String logMessage) throws IOException {
if (m_openGateways.isEmpty()) {
return;
}

LOGGER.errorWithFormat(
"Found running Python processes (%d). Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.",
m_openGateways.size(), Thread.currentThread().getName());
logMessageConsumer.accept(String.format(logMessage, m_openGateways.size(), Thread.currentThread().getName()));

var exceptions = new ArrayList<Exception>();
for (var gateway : m_openGateways) {
Expand Down