feat: preserve initial message in task history by reusing TaskManager#801
feat: preserve initial message in task history by reusing TaskManager#801kabir wants to merge 1 commit intoa2aproject:mainfrom
Conversation
Pass TaskManager from DefaultRequestHandler to MainEventBusProcessor via EventQueue, allowing the initial message to be included in task history when the agent submits the task. Clear TaskManager from MainQueue on ChildQueue close to prevent memory leak, as MainQueue can live for a long time but TaskManager is only needed for the first event. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to preserve the initial message in a task's history by reusing the TaskManager instance created during the initial request. This is implemented by associating the TaskManager with the EventQueue and retrieving it within the MainEventBusProcessor. Feedback indicates a potential race condition where the TaskManager could be cleared from the queue prematurely in non-blocking scenarios. It is recommended to move the logic for clearing the TaskManager to the MainEventBusProcessor after the first task-related event is successfully processed to ensure availability and prevent memory leaks.
| // MainQueue can live for a long time, but TaskManager is only needed for first message | ||
| // Safe to do for all ChildQueues - clearing null is a no-op for resubscriptions | ||
| parent.setInitialMessageTaskManager(null); | ||
| LOGGER.debug("Cleared TaskManager from MainQueue on ChildQueue close"); | ||
|
|
There was a problem hiding this comment.
Clearing the TaskManager here introduces a race condition for non-blocking requests (returnImmediately=true). In DefaultRequestHandler.onMessageSend, the ChildQueue is closed immediately in the finally block (line 617). If the MainEventBusProcessor hasn't processed the first event yet, the TaskManager (and thus the initial message) will be lost before it can be used. It is safer to clear the TaskManager in MainEventBusProcessor after its first successful use, or in MainQueue.doClose as a final cleanup.
| // Try to get TaskManager from MainQueue (preserves initial message for new tasks) | ||
| // Falls back to creating a temporary one for existing tasks | ||
| TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); | ||
| if (taskManager == null) { | ||
| // Create temporary TaskManager instance for this event | ||
| taskManager = new TaskManager(taskId, contextId, taskStore, null); | ||
| LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); | ||
| } else { | ||
| LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); | ||
| } |
There was a problem hiding this comment.
To prevent memory leaks and ensure the TaskManager is only used for the initial event, it should be cleared from the queue after its first task-related event is processed. This also avoids the race condition identified in ChildQueue.doClose for non-blocking requests.
| // Try to get TaskManager from MainQueue (preserves initial message for new tasks) | |
| // Falls back to creating a temporary one for existing tasks | |
| TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); | |
| if (taskManager == null) { | |
| // Create temporary TaskManager instance for this event | |
| taskManager = new TaskManager(taskId, contextId, taskStore, null); | |
| LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); | |
| } else { | |
| LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); | |
| } | |
| // Try to get TaskManager from MainQueue (preserves initial message for new tasks) | |
| // Falls back to creating a temporary one for existing tasks | |
| TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); | |
| boolean isInitialTaskManager = taskManager != null; | |
| if (taskManager == null) { | |
| // Create temporary TaskManager instance for this event | |
| taskManager = new TaskManager(taskId, contextId, taskStore, null); | |
| LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); | |
| } else { | |
| LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); | |
| } | |
| // Use TaskManager.process() - handles all event types with existing logic | |
| boolean isFinal = taskManager.process(event, isReplicated); | |
| // Clear TaskManager from queue after first task-related event to prevent memory leak | |
| if (isInitialTaskManager && (event instanceof Task || event instanceof TaskStatusUpdateEvent || | |
| event instanceof TaskArtifactUpdateEvent || event instanceof A2AError)) { | |
| mainQueue.setInitialMessageTaskManager(null); | |
| } |
Pass TaskManager from DefaultRequestHandler to MainEventBusProcessor via EventQueue, allowing the initial message to be included in task history when the agent submits the task.
Clear TaskManager from MainQueue on ChildQueue close to prevent memory leak, as MainQueue can live for a long time but TaskManager is only needed for the first event.