Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {

Expand All @@ -56,6 +57,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
private Cancellable everySchedule;
private Cancellable cronSchedule;
private Collection<WorkflowInstance> scheduledInstances = new ArrayList<>();
private Map<String, WorkflowInstance> activeInstances = new ConcurrentHashMap<>();

private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
Expand Down Expand Up @@ -177,6 +179,18 @@ public void addScheduledInstance(WorkflowInstance workflowInstance) {
scheduledInstances.add(workflowInstance);
}

void removeInstance(WorkflowInstance instance) {
activeInstances.remove(instance.id());
}

void addInstance(WorkflowInstance instance) {
activeInstances.put(instance.id(), instance);
}

public Optional<WorkflowInstance> activeInstance(String instanceId) {
return Optional.ofNullable(activeInstances.get(instanceId));
}

@Override
public WorkflowDefinitionId id() {
return definitionId;
Expand All @@ -194,6 +208,7 @@ public void close() {
cronSchedule.cancel();
}
scheduledInstances.clear();
activeInstances.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected WorkflowMutableInstance(WorkflowDefinition definition, String id, Work
this.input = input;
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
this.workflowContext = new WorkflowContext(definition, this);
definition.addInstance(this);
}

@Override
Expand Down Expand Up @@ -120,6 +121,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) {
if (ex != null) {
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
}
workflowContext.definition().removeInstance(this);
}

private void handleException(Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ protected final Stream<WorkflowInstance> scanAll(
String applicationId) {
return operations
.scanAll(applicationId, definition)
.map(v -> new WorkflowPersistenceInstance(definition, v));
.map(v -> WorkflowPersistenceInstance.of(definition, v));
}

protected final Optional<WorkflowInstance> find(
PersistenceInstanceOperations operations, WorkflowDefinition definition, String instanceId) {
return operations
.readWorkflowInfo(definition, instanceId)
.map(i -> new WorkflowPersistenceInstance(definition, i));
.map(i -> WorkflowPersistenceInstance.of(definition, i));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowInstance;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutableInstance;
import io.serverlessworkflow.impl.WorkflowStatus;
Expand All @@ -28,7 +29,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance {

private final PersistenceWorkflowInfo info;

public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
return definition
.activeInstance(info.id())
.orElseGet(() -> new WorkflowPersistenceInstance(definition, info));
}

private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
super(definition, info.id(), info.input());
this.info = info;
this.startedAt = info.startedAt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void testWorkflowInstance() throws InterruptedException {
try (Stream<WorkflowInstance> stream = handlers.reader().scanAll(definition)) {
assertThat(stream.count()).isEqualTo(1);
}
definition.close();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that I have to do this in order the test to work, it is enough proof that the change is working.
If the instance map is not clear, then the instance is reused and none of the methods are invoked, failing the test.

instance =
(WorkflowPersistenceInstance)
handlers.reader().find(definition, workflowInstance.id()).orElseThrow();
Expand Down
Loading