diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java index 253a0c8ad..a10ff81ba 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventQueue.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.a2aproject.sdk.server.tasks.TaskManager; import org.a2aproject.sdk.server.tasks.TaskStateProvider; import org.a2aproject.sdk.spec.Event; import org.a2aproject.sdk.spec.Task; @@ -366,6 +367,26 @@ public boolean isClosed() { return closed; } + /** + * Sets the TaskManager for this queue. + * This allows MainEventBusProcessor to reuse the TaskManager created in DefaultRequestHandler, + * preserving the initial message in the task's history. Only relevant for messages + * resulting in new tasks. + * + * @param taskManager the TaskManager to associate with this queue, or null to clear it + */ + public abstract void setInitialMessageTaskManager(@Nullable TaskManager taskManager); + + /** + * Returns the TaskManager associated with this queue. + * Returns null if no TaskManager has been set. + * Only relevant for messages resulting in new tasks. + * + * @return the associated TaskManager, or null + */ + @Nullable + public abstract TaskManager getInitialMessageTaskManager(); + /** * Internal method to close the queue gracefully. * Delegates to {@link #doClose(boolean)} with immediate=false. @@ -400,6 +421,8 @@ static class MainQueue extends EventQueue { private final List onCloseCallbacks; private final @Nullable TaskStateProvider taskStateProvider; private final MainEventBus mainEventBus; + /** TaskManager for preserving initial message in new tasks; cleared after first use */ + private volatile @Nullable TaskManager initialMessageTaskManager; MainQueue(int queueSize, @Nullable EventEnqueueHook hook, @@ -440,6 +463,17 @@ public int getChildCount() { return enqueueHook; } + @Override + public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) { + this.initialMessageTaskManager = taskManager; + } + + @Override + @Nullable + public TaskManager getInitialMessageTaskManager() { + return initialMessageTaskManager; + } + @Override public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption"); @@ -802,9 +836,27 @@ public void signalQueuePollerStarted() { parent.signalQueuePollerStarted(); } + @Override + public void setInitialMessageTaskManager(@Nullable TaskManager taskManager) { + parent.setInitialMessageTaskManager(taskManager); + } + + @Override + @Nullable + public TaskManager getInitialMessageTaskManager() { + return parent.getInitialMessageTaskManager(); + } + @Override protected void doClose(boolean immediate) { super.doClose(immediate); // Sets closed flag + + // Clear TaskManager from parent to prevent memory leak + // MainQueue can live for a long time, but TaskManager is only needed for first message + // Safe to do for all ChildQueues - clearing null is a no-op for resubscriptions + parent.setInitialMessageTaskManager(null); + LOGGER.debug("Cleared TaskManager from MainQueue on ChildQueue close"); + if (immediate) { // Immediate close: clear pending events from local queue this.immediateClose = true; diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java index 3b5019856..14be6d498 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/MainEventBusProcessor.java @@ -14,10 +14,8 @@ import org.a2aproject.sdk.server.tasks.TaskSerializationException; import org.a2aproject.sdk.server.tasks.TaskStore; import org.a2aproject.sdk.spec.A2AError; -import org.a2aproject.sdk.spec.A2AServerException; import org.a2aproject.sdk.spec.Event; import org.a2aproject.sdk.spec.InternalError; -import org.a2aproject.sdk.spec.Message; import org.a2aproject.sdk.spec.Task; import org.a2aproject.sdk.spec.StreamingEventKind; import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; @@ -207,7 +205,7 @@ private void processEvent(MainEventBusContext context) { // If this throws, we distribute an error to ensure "persist before client visibility" try { - boolean isFinal = updateTaskStore(taskId, event, isReplicated); + boolean isFinal = updateTaskStore(taskId, event, isReplicated, mainQueue); eventToDistribute = event; // Success - distribute original event @@ -279,9 +277,10 @@ private void processEvent(MainEventBusContext context) { /** * Updates TaskStore using TaskManager.process(). *

- * Creates a temporary TaskManager instance for this event and delegates to its process() method, - * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). - * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc. + * Attempts to reuse the TaskManager from the MainQueue (if available), which preserves + * the initial message in the task's history. Falls back to creating a temporary TaskManager + * if none is available. The TaskManager.process() method handles all event types + * (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). *

*

* If persistence fails, the exception is propagated to processEvent() which distributes an @@ -291,21 +290,32 @@ private void processEvent(MainEventBusContext context) { * * @param taskId the task ID * @param event the event to persist + * @param isReplicated whether this is a replicated event + * @param mainQueue the main queue for this task * @return true if the task reached a final state, false otherwise * @throws InternalError if persistence fails */ - private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError { + private boolean updateTaskStore(String taskId, Event event, boolean isReplicated, EventQueue.MainQueue mainQueue) throws InternalError { try { // Extract contextId from event (all relevant events have it) String contextId = extractContextId(event); - // Create temporary TaskManager instance for this event - TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); + // Try to get TaskManager from MainQueue (preserves initial message for new tasks) + // Falls back to creating a temporary one for existing tasks + TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); + if (taskManager == null) { + // Create temporary TaskManager instance for this event + taskManager = new TaskManager(taskId, contextId, taskStore, null); + LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); + } else { + LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); + } // Use TaskManager.process() - handles all event types with existing logic boolean isFinal = taskManager.process(event, isReplicated); LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", taskId, event.getClass().getSimpleName(), isFinal, isReplicated); + return isFinal; } catch (TaskSerializationException e) { diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java index 0e3aab7f4..c7f8fb81f 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java @@ -66,7 +66,6 @@ import org.a2aproject.sdk.spec.TaskPushNotificationConfig; import org.a2aproject.sdk.spec.TaskQueryParams; import org.a2aproject.sdk.spec.TaskState; -import org.a2aproject.sdk.spec.TaskStatusUpdateEvent; import org.a2aproject.sdk.spec.UnsupportedOperationError; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -449,6 +448,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Create queue with real taskId (no tempId parameter needed) EventQueue queue = queueManager.createOrTap(queueTaskId); + + // For NEW tasks, associate the TaskManager with the queue + // This allows MainEventBusProcessor to reuse it, preserving the initial message + if (mss.task() == null) { + queue.setInitialMessageTaskManager(mss.taskManager); + LOGGER.debug("Set TaskManager on queue for new task {}", queueTaskId); + } + final java.util.concurrent.atomic.AtomicReference<@NonNull String> taskId = new java.util.concurrent.atomic.AtomicReference<>(queueTaskId); ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor); @@ -649,6 +656,13 @@ public Flow.Publisher onMessageSendStream( EventQueue queue = queueManager.createOrTap(queueTaskId); LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue); + // For NEW tasks, associate the TaskManager with the queue + // This allows MainEventBusProcessor to reuse it, preserving the initial message + if (mss.task() == null) { + queue.setInitialMessageTaskManager(mss.taskManager); + LOGGER.debug("Set TaskManager on queue for new task {} (streaming)", queueTaskId); + } + // Store push notification config SYNCHRONOUSLY for new tasks before agent starts // This ensures config is available when MainEventBusProcessor sends push notifications // For existing tasks, config is stored in initMessageSend() diff --git a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java index 8914586f5..cd7f4c3a6 100644 --- a/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandlerTest.java @@ -799,6 +799,59 @@ void testSendMessageStream_WithNonExistentTaskId_ThrowsTaskNotFoundError() { "Expected TaskNotFoundError when onMessageSendStream references a non-existent taskId"); } + /** + * Test that initial message is preserved in task history when agent submits task. + * Verifies that MainEventBusProcessor reuses TaskManager from queue (with initial message) + * rather than creating a new one without it. + */ + @Test + void testNewTask_InitialMessagePreservedInHistory() throws Exception { + // Arrange: Create an initial message + CountDownLatch agentCompleted = new CountDownLatch(1); + + agentExecutorExecute = (context, emitter) -> { + // Submit a task - should preserve initial message in history + emitter.submit(); + emitter.complete(); + agentCompleted.countDown(); + }; + + Message initialMessage = Message.builder() + .messageId("msg-initial-123") + .role(Message.Role.ROLE_USER) + .parts(new TextPart("Hello, please do something")) + .build(); + + MessageSendParams params = MessageSendParams.builder() + .message(initialMessage) + .configuration(DEFAULT_CONFIG) + .build(); + + // Act: Send message (non-streaming) + EventKind eventKind = requestHandler.onMessageSend(params, NULL_CONTEXT); + + // Assert: Task returned + assertNotNull(eventKind); + assertInstanceOf(Task.class, eventKind); + Task result = (Task) eventKind; + + // Wait for agent to complete + assertTrue(agentCompleted.await(5, TimeUnit.SECONDS), "Agent should complete"); + Thread.sleep(200); // Allow MainEventBusProcessor to persist + + // Verify task in TaskStore has initial message in history + Task storedTask = taskStore.get(result.id()); + assertNotNull(storedTask, "Task should be in TaskStore"); + assertNotNull(storedTask.history(), "Task should have history"); + assertFalse(storedTask.history().isEmpty(), "Task history should not be empty"); + + // The initial message should be in the history + boolean foundInitialMessage = storedTask.history().stream() + .anyMatch(msg -> "msg-initial-123".equals(msg.messageId())); + assertTrue(foundInitialMessage, + "Initial message should be in task history, but history was: " + storedTask.history()); + } + /** * Verification for Codex adversarial review finding: * When a follow-up message includes taskId but omits contextId,