From 5c5bc100fd1372d31af62003fecbb25ae1053728 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 17 Apr 2026 11:45:23 +0200 Subject: [PATCH] feat: preserve initial message in task history by reusing TaskManager Pass TaskManager from DefaultRequestHandler to MainEventBusProcessor via EventQueue, allowing the initial message to be included in task history when the agent submits the task. Clear TaskManager from MainQueue on ChildQueue close to prevent memory leak, as MainQueue can live for a long time but TaskManager is only needed for the first event. Co-Authored-By: Claude Sonnet 4.5 --- .../sdk/server/events/EventQueue.java | 52 ++++++++++++++++++ .../server/events/MainEventBusProcessor.java | 28 ++++++---- .../DefaultRequestHandler.java | 16 +++++- .../DefaultRequestHandlerTest.java | 53 +++++++++++++++++++ 4 files changed, 139 insertions(+), 10 deletions(-) diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java index 253a0c8ad..a10ff81ba 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.a2aproject.sdk.server.tasks.TaskManager; import org.a2aproject.sdk.server.tasks.TaskStateProvider; import org.a2aproject.sdk.spec.Event; import org.a2aproject.sdk.spec.Task; @@ -366,6 +367,26 @@ public boolean isClosed() { return closed; } + /** + * Sets the TaskManager for this queue. + * This allows MainEventBusProcessor to reuse the TaskManager created in DefaultRequestHandler, + * preserving the initial message in the task's history. Only relevant for messages + * resulting in new tasks. + * + * @param taskManager the TaskManager to associate with this queue, or null to clear it + */ + public abstract void setInitialMessageTaskManager(@Nullable TaskManager taskManager); + + /** + * Returns the TaskManager associated with this queue. + * Returns null if no TaskManager has been set. + * Only relevant for messages resulting in new tasks. + * + * @return the associated TaskManager, or null + */ + @Nullable + public abstract TaskManager getInitialMessageTaskManager(); + /** * Internal method to close the queue gracefully. * Delegates to {@link #doClose(boolean)} with immediate=false. @@ -400,6 +421,8 @@ static class MainQueue extends EventQueue { private final List onCloseCallbacks; private final @Nullable TaskStateProvider taskStateProvider; private final MainEventBus mainEventBus; + /** TaskManager for preserving initial message in new tasks; cleared after first use */ + private volatile @Nullable TaskManager initialMessageTaskManager; MainQueue(int queueSize, @Nullable EventEnqueueHook hook, @@ -440,6 +463,17 @@ public int getChildCount() { return enqueueHook; } + @Override + public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) { + this.initialMessageTaskManager = taskManager; + } + + @Override + @Nullable + public TaskManager getInitialMessageTaskManager() { + return initialMessageTaskManager; + } + @Override public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption"); @@ -802,9 +836,27 @@ public void signalQueuePollerStarted() { parent.signalQueuePollerStarted(); } + @Override + public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) { + parent.setInitialMessageTaskManager(taskManager); + } + + @Override + @Nullable + public TaskManager getInitialMessageTaskManager() { + return parent.getInitialMessageTaskManager(); + } + @Override protected void doClose(boolean immediate) { super.doClose(immediate); // Sets closed flag + + // Clear TaskManager from parent to prevent memory leak + // MainQueue can live for a long time, but TaskManager is only needed for first message + // Safe to do for all ChildQueues - clearing null is a no-op for resubscriptions + parent.setInitialMessageTaskManager(null); + LOGGER.debug("Cleared TaskManager from MainQueue on ChildQueue close"); + if (immediate) { // Immediate close: clear pending events from local queue this.immediateClose = true; diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java index 3b5019856..14be6d498 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java @@ -14,10 +14,8 @@ import org.a2aproject.sdk.server.tasks.TaskSerializationException; import org.a2aproject.sdk.server.tasks.TaskStore; import org.a2aproject.sdk.spec.A2AError; -import org.a2aproject.sdk.spec.A2AServerException; import org.a2aproject.sdk.spec.Event; import org.a2aproject.sdk.spec.InternalError; -import org.a2aproject.sdk.spec.Message; import org.a2aproject.sdk.spec.Task; import org.a2aproject.sdk.spec.StreamingEventKind; import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; @@ -207,7 +205,7 @@ private void processEvent(MainEventBusContext context) { // If this throws, we distribute an error to ensure "persist before client visibility" try { - boolean isFinal = updateTaskStore(taskId, event, isReplicated); + boolean isFinal = updateTaskStore(taskId, event, isReplicated, mainQueue); eventToDistribute = event; // Success - distribute original event @@ -279,9 +277,10 @@ private void processEvent(MainEventBusContext context) { /** * Updates TaskStore using TaskManager.process(). *

- * Creates a temporary TaskManager instance for this event and delegates to its process() method, - * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). - * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc. + * Attempts to reuse the TaskManager from the MainQueue (if available), which preserves + * the initial message in the task's history. Falls back to creating a temporary TaskManager + * if none is available. The TaskManager.process() method handles all event types + * (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). *

*

* If persistence fails, the exception is propagated to processEvent() which distributes an @@ -291,21 +290,32 @@ private void processEvent(MainEventBusContext context) { * * @param taskId the task ID * @param event the event to persist + * @param isReplicated whether this is a replicated event + * @param mainQueue the main queue for this task * @return true if the task reached a final state, false otherwise * @throws InternalError if persistence fails */ - private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError { + private boolean updateTaskStore(String taskId, Event event, boolean isReplicated, EventQueue.MainQueue mainQueue) throws InternalError { try { // Extract contextId from event (all relevant events have it) String contextId = extractContextId(event); - // Create temporary TaskManager instance for this event - TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); + // Try to get TaskManager from MainQueue (preserves initial message for new tasks) + // Falls back to creating a temporary one for existing tasks + TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); + if (taskManager == null) { + // Create temporary TaskManager instance for this event + taskManager = new TaskManager(taskId, contextId, taskStore, null); + LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); + } else { + LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); + } // Use TaskManager.process() - handles all event types with existing logic boolean isFinal = taskManager.process(event, isReplicated); LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); + return isFinal; } catch (TaskSerializationException e) { diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java index 0e3aab7f4..c7f8fb81f 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java @@ -66,7 +66,6 @@ import org.a2aproject.sdk.spec.TaskPushNotificationConfig; import org.a2aproject.sdk.spec.TaskQueryParams; import org.a2aproject.sdk.spec.TaskState; -import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; import org.a2aproject.sdk.spec.UnsupportedOperationError; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -449,6 +448,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Create queue with real taskId (no tempId parameter needed) EventQueue queue = queueManager.createOrTap(queueTaskId); + + // For NEW tasks, associate the TaskManager with the queue + // This allows MainEventBusProcessor to reuse it, preserving the initial message + if (mss.task() == null) { + queue.setInitialMessageTaskManager(mss.taskManager); + LOGGER.debug("Set TaskManager on queue for new task {}", queueTaskId); + } + final java.util.concurrent.atomic.AtomicReference<@NonNull String> taskId = new java.util.concurrent.atomic.AtomicReference<>(queueTaskId); ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor); @@ -649,6 +656,13 @@ public Flow.Publisher onMessageSendStream( EventQueue queue = queueManager.createOrTap(queueTaskId); LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue); + // For NEW tasks, associate the TaskManager with the queue + // This allows MainEventBusProcessor to reuse it, preserving the initial message + if (mss.task() == null) { + queue.setInitialMessageTaskManager(mss.taskManager); + LOGGER.debug("Set TaskManager on queue for new task {} (streaming)", queueTaskId); + } + // Store push notification config SYNCHRONOUSLY for new tasks before agent starts // This ensures config is available when MainEventBusProcessor sends push notifications // For existing tasks, config is stored in initMessageSend() diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java index 8914586f5..cd7f4c3a6 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java @@ -799,6 +799,59 @@ void testSendMessageStream_WithNonExistentTaskId_ThrowsTaskNotFoundError() { "Expected TaskNotFoundError when onMessageSendStream references a non-existent taskId"); } + /** + * Test that initial message is preserved in task history when agent submits task. + * Verifies that MainEventBusProcessor reuses TaskManager from queue (with initial message) + * rather than creating a new one without it. + */ + @Test + void testNewTask_InitialMessagePreservedInHistory() throws Exception { + // Arrange: Create an initial message + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + // Submit a task - should preserve initial message in history + emitter.submit(); + emitter.complete(); + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-123") + .role(Message.Role.ROLE_USER) + .parts(new TextPart("Hello, please do something")) + .build(); + + MessageSendParams params = MessageSendParams.builder() + .message(initialMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + // Act: Send message (non-streaming) + EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT); + + // Assert: Task returned + assertNotNull(eventKind); + assertInstanceOf(Task.class, eventKind); + Task result = (Task) eventKind; + + // Wait for agent to complete + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // Allow MainEventBusProcessor to persist + + // Verify task in TaskStore has initial message in history + Task storedTask = taskStore.get(result.id()); + assertNotNull(storedTask, "Task should be in TaskStore"); + assertNotNull(storedTask.history(), "Task should have history"); + assertFalse(storedTask.history().isEmpty(), "Task history should not be empty"); + + // The initial message should be in the history + boolean foundInitialMessage = storedTask.history().stream() + .anyMatch(msg -> "msg-initial-123".equals(msg.messageId())); + assertTrue(foundInitialMessage, + "Initial message should be in task history, but history was: " + storedTask.history()); + } + /** * Verification for Codex adversarial review finding: * When a follow-up message includes taskId but omits contextId,