Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.a2aproject.sdk.server.tasks.TaskManager;
import org.a2aproject.sdk.server.tasks.TaskStateProvider;
import org.a2aproject.sdk.spec.Event;
import org.a2aproject.sdk.spec.Task;
Expand Down Expand Up @@ -366,6 +367,26 @@ public boolean isClosed() {
return closed;
}

/**
* Sets the TaskManager for this queue.
* This allows MainEventBusProcessor to reuse the TaskManager created in DefaultRequestHandler,
* preserving the initial message in the task's history. Only relevant for messages
* resulting in new tasks.
*
* @param taskManager the TaskManager to associate with this queue, or null to clear it
*/
public abstract void setInitialMessageTaskManager(@Nullable TaskManager taskManager);

/**
* Returns the TaskManager associated with this queue.
* Returns null if no TaskManager has been set.
* Only relevant for messages resulting in new tasks.
*
* @return the associated TaskManager, or null
*/
@Nullable
public abstract TaskManager getInitialMessageTaskManager();

/**
* Internal method to close the queue gracefully.
* Delegates to {@link #doClose(boolean)} with immediate=false.
Expand Down Expand Up @@ -400,6 +421,8 @@ static class MainQueue extends EventQueue {
private final List<Runnable> onCloseCallbacks;
private final @Nullable TaskStateProvider taskStateProvider;
private final MainEventBus mainEventBus;
/** TaskManager for preserving initial message in new tasks; cleared after first use */
private volatile @Nullable TaskManager initialMessageTaskManager;

MainQueue(int queueSize,
@Nullable EventEnqueueHook hook,
Expand Down Expand Up @@ -440,6 +463,17 @@ public int getChildCount() {
return enqueueHook;
}

@Override
public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) {
this.initialMessageTaskManager = taskManager;
}

@Override
@Nullable
public TaskManager getInitialMessageTaskManager() {
return initialMessageTaskManager;
}

@Override
public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption");
Expand Down Expand Up @@ -802,9 +836,27 @@ public void signalQueuePollerStarted() {
parent.signalQueuePollerStarted();
}

@Override
public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) {
parent.setInitialMessageTaskManager(taskManager);
}

@Override
@Nullable
public TaskManager getInitialMessageTaskManager() {
return parent.getInitialMessageTaskManager();
}

@Override
protected void doClose(boolean immediate) {
super.doClose(immediate); // Sets closed flag

// Clear TaskManager from parent to prevent memory leak
// MainQueue can live for a long time, but TaskManager is only needed for first message
// Safe to do for all ChildQueues - clearing null is a no-op for resubscriptions
parent.setInitialMessageTaskManager(null);
LOGGER.debug("Cleared TaskManager from MainQueue on ChildQueue close");

Comment on lines +855 to +859
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Clearing the TaskManager here introduces a race condition for non-blocking requests (returnImmediately=true). In DefaultRequestHandler.onMessageSend, the ChildQueue is closed immediately in the finally block (line 617). If the MainEventBusProcessor hasn't processed the first event yet, the TaskManager (and thus the initial message) will be lost before it can be used. It is safer to clear the TaskManager in MainEventBusProcessor after its first successful use, or in MainQueue.doClose as a final cleanup.

if (immediate) {
// Immediate close: clear pending events from local queue
this.immediateClose = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
import org.a2aproject.sdk.server.tasks.TaskSerializationException;
import org.a2aproject.sdk.server.tasks.TaskStore;
import org.a2aproject.sdk.spec.A2AError;
import org.a2aproject.sdk.spec.A2AServerException;
import org.a2aproject.sdk.spec.Event;
import org.a2aproject.sdk.spec.InternalError;
import org.a2aproject.sdk.spec.Message;
import org.a2aproject.sdk.spec.Task;
import org.a2aproject.sdk.spec.StreamingEventKind;
import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent;
Expand Down Expand Up @@ -207,7 +205,7 @@ private void processEvent(MainEventBusContext context) {
// If this throws, we distribute an error to ensure "persist before client visibility"

try {
boolean isFinal = updateTaskStore(taskId, event, isReplicated);
boolean isFinal = updateTaskStore(taskId, event, isReplicated, mainQueue);

eventToDistribute = event; // Success - distribute original event

Expand Down Expand Up @@ -279,9 +277,10 @@ private void processEvent(MainEventBusContext context) {
/**
* Updates TaskStore using TaskManager.process().
* <p>
* Creates a temporary TaskManager instance for this event and delegates to its process() method,
* which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
* This leverages existing TaskManager logic for status updates, artifact appending, message history, etc.
* Attempts to reuse the TaskManager from the MainQueue (if available), which preserves
* the initial message in the task's history. Falls back to creating a temporary TaskManager
* if none is available. The TaskManager.process() method handles all event types
* (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
* </p>
* <p>
* If persistence fails, the exception is propagated to processEvent() which distributes an
Expand All @@ -291,21 +290,32 @@ private void processEvent(MainEventBusContext context) {
*
* @param taskId the task ID
* @param event the event to persist
* @param isReplicated whether this is a replicated event
* @param mainQueue the main queue for this task
* @return true if the task reached a final state, false otherwise
* @throws InternalError if persistence fails
*/
private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError {
private boolean updateTaskStore(String taskId, Event event, boolean isReplicated, EventQueue.MainQueue mainQueue) throws InternalError {
try {
// Extract contextId from event (all relevant events have it)
String contextId = extractContextId(event);

// Create temporary TaskManager instance for this event
TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null);
// Try to get TaskManager from MainQueue (preserves initial message for new tasks)
// Falls back to creating a temporary one for existing tasks
TaskManager taskManager = mainQueue.getInitialMessageTaskManager();
if (taskManager == null) {
// Create temporary TaskManager instance for this event
taskManager = new TaskManager(taskId, contextId, taskStore, null);
LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId);
} else {
LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId);
}
Comment on lines +303 to +312
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent memory leaks and ensure the TaskManager is only used for the initial event, it should be cleared from the queue after its first task-related event is processed. This also avoids the race condition identified in ChildQueue.doClose for non-blocking requests.

Suggested change
// Try to get TaskManager from MainQueue (preserves initial message for new tasks)
// Falls back to creating a temporary one for existing tasks
TaskManager taskManager = mainQueue.getInitialMessageTaskManager();
if (taskManager == null) {
// Create temporary TaskManager instance for this event
taskManager = new TaskManager(taskId, contextId, taskStore, null);
LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId);
} else {
LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId);
}
// Try to get TaskManager from MainQueue (preserves initial message for new tasks)
// Falls back to creating a temporary one for existing tasks
TaskManager taskManager = mainQueue.getInitialMessageTaskManager();
boolean isInitialTaskManager = taskManager != null;
if (taskManager == null) {
// Create temporary TaskManager instance for this event
taskManager = new TaskManager(taskId, contextId, taskStore, null);
LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId);
} else {
LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId);
}
// Use TaskManager.process() - handles all event types with existing logic
boolean isFinal = taskManager.process(event, isReplicated);
// Clear TaskManager from queue after first task-related event to prevent memory leak
if (isInitialTaskManager && (event instanceof Task || event instanceof TaskStatusUpdateEvent ||
event instanceof TaskArtifactUpdateEvent || event instanceof A2AError)) {
mainQueue.setInitialMessageTaskManager(null);
}


// Use TaskManager.process() - handles all event types with existing logic
boolean isFinal = taskManager.process(event, isReplicated);
LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})",
taskId, event.getClass().getSimpleName(), isFinal, isReplicated);

return isFinal;

} catch (TaskSerializationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.a2aproject.sdk.spec.TaskPushNotificationConfig;
import org.a2aproject.sdk.spec.TaskQueryParams;
import org.a2aproject.sdk.spec.TaskState;
import org.a2aproject.sdk.spec.TaskStatusUpdateEvent;
import org.a2aproject.sdk.spec.UnsupportedOperationError;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -449,6 +448,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte

// Create queue with real taskId (no tempId parameter needed)
EventQueue queue = queueManager.createOrTap(queueTaskId);

// For NEW tasks, associate the TaskManager with the queue
// This allows MainEventBusProcessor to reuse it, preserving the initial message
if (mss.task() == null) {
queue.setInitialMessageTaskManager(mss.taskManager);
LOGGER.debug("Set TaskManager on queue for new task {}", queueTaskId);
}

final java.util.concurrent.atomic.AtomicReference<@NonNull String> taskId = new java.util.concurrent.atomic.AtomicReference<>(queueTaskId);
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor);

Expand Down Expand Up @@ -649,6 +656,13 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
EventQueue queue = queueManager.createOrTap(queueTaskId);
LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue);

// For NEW tasks, associate the TaskManager with the queue
// This allows MainEventBusProcessor to reuse it, preserving the initial message
if (mss.task() == null) {
queue.setInitialMessageTaskManager(mss.taskManager);
LOGGER.debug("Set TaskManager on queue for new task {} (streaming)", queueTaskId);
}

// Store push notification config SYNCHRONOUSLY for new tasks before agent starts
// This ensures config is available when MainEventBusProcessor sends push notifications
// For existing tasks, config is stored in initMessageSend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,59 @@ void testSendMessageStream_WithNonExistentTaskId_ThrowsTaskNotFoundError() {
"Expected TaskNotFoundError when onMessageSendStream references a non-existent taskId");
}

/**
* Test that initial message is preserved in task history when agent submits task.
* Verifies that MainEventBusProcessor reuses TaskManager from queue (with initial message)
* rather than creating a new one without it.
*/
@Test
void testNewTask_InitialMessagePreservedInHistory() throws Exception {
// Arrange: Create an initial message
CountDownLatch agentCompleted = new CountDownLatch(1);

agentExecutorExecute = (context, emitter) -> {
// Submit a task - should preserve initial message in history
emitter.submit();
emitter.complete();
agentCompleted.countDown();
};

Message initialMessage = Message.builder()
.messageId("msg-initial-123")
.role(Message.Role.ROLE_USER)
.parts(new TextPart("Hello, please do something"))
.build();

MessageSendParams params = MessageSendParams.builder()
.message(initialMessage)
.configuration(DEFAULT_CONFIG)
.build();

// Act: Send message (non-streaming)
EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT);

// Assert: Task returned
assertNotNull(eventKind);
assertInstanceOf(Task.class, eventKind);
Task result = (Task) eventKind;

// Wait for agent to complete
assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete");
Thread.sleep(200); // Allow MainEventBusProcessor to persist

// Verify task in TaskStore has initial message in history
Task storedTask = taskStore.get(result.id());
assertNotNull(storedTask, "Task should be in TaskStore");
assertNotNull(storedTask.history(), "Task should have history");
assertFalse(storedTask.history().isEmpty(), "Task history should not be empty");

// The initial message should be in the history
boolean foundInitialMessage = storedTask.history().stream()
.anyMatch(msg -> "msg-initial-123".equals(msg.messageId()));
assertTrue(foundInitialMessage,
"Initial message should be in task history, but history was: " + storedTask.history());
}

/**
* Verification for Codex adversarial review finding:
* When a follow-up message includes taskId but omits contextId,
Expand Down
Loading