From dec3d0c4bc484c8e604280203e47a755e3ef96f1 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 10 Apr 2026 14:18:11 +0200 Subject: [PATCH] [Fix #1296] Prevent double loading of the same process instance Signed-off-by: fjtirado --- .../impl/WorkflowDefinition.java | 15 +++++++++++++++ .../impl/WorkflowMutableInstance.java | 2 ++ .../AbstractPersistenceInstanceReader.java | 4 ++-- .../persistence/WorkflowPersistenceInstance.java | 9 ++++++++- .../test/AbstractHandlerPersistenceTest.java | 1 + 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 2a84f4f43..770f41367 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData { @@ -56,6 +57,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private Cancellable everySchedule; private Cancellable cronSchedule; private Collection scheduledInstances = new ArrayList<>(); + private Map activeInstances = new ConcurrentHashMap<>(); private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { @@ -177,6 +179,18 @@ public void addScheduledInstance(WorkflowInstance workflowInstance) { scheduledInstances.add(workflowInstance); } + void removeInstance(WorkflowInstance instance) { + activeInstances.remove(instance.id()); + } + + void addInstance(WorkflowInstance instance) { + activeInstances.put(instance.id(), instance); + } + + public Optional activeInstance(String instanceId) { + return Optional.ofNullable(activeInstances.get(instanceId)); + } + @Override public WorkflowDefinitionId id() { return definitionId; @@ -194,6 +208,7 @@ public void close() { cronSchedule.cancel(); } scheduledInstances.clear(); + activeInstances.clear(); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 0c67713a7..cfd380523 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -63,6 +63,7 @@ protected WorkflowMutableInstance(WorkflowDefinition definition, String id, Work this.input = input; this.status = new AtomicReference<>(WorkflowStatus.PENDING); this.workflowContext = new WorkflowContext(definition, this); + definition.addInstance(this); } @Override @@ -120,6 +121,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) { if (ex != null) { handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex); } + workflowContext.definition().removeInstance(this); } private void handleException(Throwable ex) { diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java index 586809bbc..723d81cc1 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java @@ -28,14 +28,14 @@ protected final Stream scanAll( String applicationId) { return operations .scanAll(applicationId, definition) - .map(v -> new WorkflowPersistenceInstance(definition, v)); + .map(v -> WorkflowPersistenceInstance.of(definition, v)); } protected final Optional find( PersistenceInstanceOperations operations, WorkflowDefinition definition, String instanceId) { return operations .readWorkflowInfo(definition, instanceId) - .map(i -> new WorkflowPersistenceInstance(definition, i)); + .map(i -> WorkflowPersistenceInstance.of(definition, i)); } @Override diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java index df767d6e2..47bc65bc3 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -18,6 +18,7 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutableInstance; import io.serverlessworkflow.impl.WorkflowStatus; @@ -28,7 +29,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance { private final PersistenceWorkflowInfo info; - public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { + public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWorkflowInfo info) { + return definition + .activeInstance(info.id()) + .orElseGet(() -> new WorkflowPersistenceInstance(definition, info)); + } + + private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { super(definition, info.id(), info.input()); this.info = info; this.startedAt = info.startedAt(); diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java index fe0bb9da2..05baba827 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -152,6 +152,7 @@ void testWorkflowInstance() throws InterruptedException { try (Stream stream = handlers.reader().scanAll(definition)) { assertThat(stream.count()).isEqualTo(1); } + definition.close(); instance = (WorkflowPersistenceInstance) handlers.reader().find(definition, workflowInstance.id()).orElseThrow();